package org.apache.storm.sql.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.kafka.trident.TridentKafkaState;
import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.sql.runtime.DataSource;
import org.apache.storm.sql.runtime.DataSourcesProvider;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.IOutputSerializer;
import org.apache.storm.sql.runtime.ISqlTridentDataSource;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.Function;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.spout.ITridentDataSource;
import org.apache.storm.trident.tuple.TridentTuple;

/* loaded from: input_file:org/apache/storm/sql/kafka/KafkaDataSourcesProvider.class */
public class KafkaDataSourcesProvider implements DataSourcesProvider {
    private static final int DEFAULT_ZK_PORT = 2181;

    /* loaded from: input_file:org/apache/storm/sql/kafka/KafkaDataSourcesProvider$KafkaTridentDataSource.class */
    private static class KafkaTridentDataSource implements ISqlTridentDataSource {
        private final TridentKafkaConfig conf;
        private final String topic;
        private final int primaryKeyIndex;
        private final List<String> fields;
        private final Properties producerProperties;

        private KafkaTridentDataSource(TridentKafkaConfig tridentKafkaConfig, String str, int i, Properties properties, List<String> list) {
            this.conf = tridentKafkaConfig;
            this.topic = str;
            this.primaryKeyIndex = i;
            this.producerProperties = properties;
            this.fields = list;
        }

        public ITridentDataSource getProducer() {
            return new OpaqueTridentKafkaSpout(this.conf);
        }

        public Function getConsumer() {
            return new KafkaTridentSink(this.topic, this.primaryKeyIndex, this.producerProperties, this.fields);
        }
    }

    /* loaded from: input_file:org/apache/storm/sql/kafka/KafkaDataSourcesProvider$KafkaTridentSink.class */
    static class KafkaTridentSink extends BaseFunction {
        private transient TridentKafkaState state;
        private final String topic;
        private final int primaryKeyIndex;
        private final Properties producerProperties;
        private final List<String> fieldNames;

        private KafkaTridentSink(String str, int i, Properties properties, List<String> list) {
            this.topic = str;
            this.primaryKeyIndex = i;
            this.producerProperties = properties;
            this.fieldNames = list;
        }

        public void cleanup() {
            super.cleanup();
        }

        public void prepare(Map map, TridentOperationContext tridentOperationContext) {
            this.state = new TridentKafkaState().withKafkaTopicSelector(new DefaultTopicSelector(this.topic)).withTridentTupleToKafkaMapper(new SqlKafkaMapper(this.primaryKeyIndex, new JsonSerializer(this.fieldNames)));
            this.state.prepare(this.producerProperties);
        }

        public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
            this.state.updateState(Collections.singletonList(tridentTuple), tridentCollector);
        }
    }

    /* loaded from: input_file:org/apache/storm/sql/kafka/KafkaDataSourcesProvider$SqlKafkaMapper.class */
    private static class SqlKafkaMapper implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
        private final int primaryKeyIndex;
        private final IOutputSerializer serializer;

        private SqlKafkaMapper(int i, IOutputSerializer iOutputSerializer) {
            this.primaryKeyIndex = i;
            this.serializer = iOutputSerializer;
        }

        public Object getKeyFromTuple(TridentTuple tridentTuple) {
            return tridentTuple.get(this.primaryKeyIndex);
        }

        /* renamed from: getMessageFromTuple, reason: merged with bridge method [inline-methods] */
        public ByteBuffer m0getMessageFromTuple(TridentTuple tridentTuple) {
            return this.serializer.write(tridentTuple.getValues(), (ByteBuffer) null);
        }
    }

    public String scheme() {
        return "kafka";
    }

    public DataSource construct(URI uri, String str, String str2, List<FieldInfo> list) {
        throw new UnsupportedOperationException();
    }

    public ISqlTridentDataSource constructTrident(URI uri, String str, String str2, String str3, List<FieldInfo> list) {
        ZkHosts zkHosts = new ZkHosts(uri.getHost() + ":" + (uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT), uri.getPath());
        String str4 = parseURIParams(uri.getQuery()).get("topic");
        Preconditions.checkNotNull(str4, "No topic of the spout is specified");
        TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(zkHosts, str4);
        ArrayList arrayList = new ArrayList();
        int i = -1;
        for (int i2 = 0; i2 < list.size(); i2++) {
            FieldInfo fieldInfo = list.get(i2);
            arrayList.add(fieldInfo.name());
            if (fieldInfo.isPrimary()) {
                i = i2;
            }
        }
        Preconditions.checkState(i != -1, "Kafka stream table must have a primary key");
        tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new JsonScheme(arrayList));
        ObjectMapper objectMapper = new ObjectMapper();
        Properties properties = new Properties();
        try {
            HashMap hashMap = (HashMap) ((HashMap) objectMapper.readValue(str3, HashMap.class)).get("producer");
            Preconditions.checkNotNull(hashMap, "Kafka Table must contain producer config");
            properties.putAll(hashMap);
            return new KafkaTridentDataSource(tridentKafkaConfig, str4, i, properties, arrayList);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static Map<String, String> parseURIParams(String str) {
        HashMap hashMap = new HashMap();
        if (str == null) {
            return hashMap;
        }
        for (String str2 : str.split("&")) {
            String[] split = str2.split("=", 2);
            if (split.length > 1) {
                hashMap.put(split[0], split[1]);
            }
        }
        return hashMap;
    }
}
