/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.sql.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.kafka.trident.TridentKafkaState;
import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.sql.kafka.JsonScheme;
import org.apache.storm.sql.kafka.JsonSerializer;
import org.apache.storm.sql.runtime.DataSource;
import org.apache.storm.sql.runtime.DataSourcesProvider;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.IOutputSerializer;
import org.apache.storm.sql.runtime.ISqlTridentDataSource;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.Function;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.spout.ITridentDataSource;
import org.apache.storm.trident.tuple.TridentTuple;

public class KafkaDataSourcesProvider
implements DataSourcesProvider {
    private static final int DEFAULT_ZK_PORT = 2181;

    public String scheme() {
        return "kafka";
    }

    public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, List<FieldInfo> fields) {
        throw new UnsupportedOperationException();
    }

    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, String properties, List<FieldInfo> fields) {
        int port = uri.getPort() != -1 ? uri.getPort() : 2181;
        ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
        Map<String, String> values = KafkaDataSourcesProvider.parseURIParams(uri.getQuery());
        String topic = values.get("topic");
        Preconditions.checkNotNull((Object)topic, (Object)"No topic of the spout is specified");
        TridentKafkaConfig conf = new TridentKafkaConfig((BrokerHosts)zk, topic);
        ArrayList<String> fieldNames = new ArrayList<String>();
        int primaryIndex = -1;
        for (int i = 0; i < fields.size(); ++i) {
            FieldInfo f = fields.get(i);
            fieldNames.add(f.name());
            if (!f.isPrimary()) continue;
            primaryIndex = i;
        }
        Preconditions.checkState((primaryIndex != -1 ? 1 : 0) != 0, (Object)"Kafka stream table must have a primary key");
        conf.scheme = new SchemeAsMultiScheme((Scheme)new JsonScheme(fieldNames));
        ObjectMapper mapper = new ObjectMapper();
        Properties producerProp = new Properties();
        try {
            HashMap map = (HashMap)mapper.readValue(properties, HashMap.class);
            HashMap producerConfig = (HashMap)map.get("producer");
            Preconditions.checkNotNull((Object)producerConfig, (Object)"Kafka Table must contain producer config");
            producerProp.putAll((Map<?, ?>)producerConfig);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return new KafkaTridentDataSource(conf, topic, primaryIndex, producerProp, fieldNames);
    }

    private static Map<String, String> parseURIParams(String query) {
        String[] params;
        HashMap<String, String> res = new HashMap<String, String>();
        if (query == null) {
            return res;
        }
        for (String p : params = query.split("&")) {
            String[] v = p.split("=", 2);
            if (v.length <= 1) continue;
            res.put(v[0], v[1]);
        }
        return res;
    }

    private static class KafkaTridentDataSource
    implements ISqlTridentDataSource {
        private final TridentKafkaConfig conf;
        private final String topic;
        private final int primaryKeyIndex;
        private final List<String> fields;
        private final Properties producerProperties;

        private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex, Properties producerProperties, List<String> fields) {
            this.conf = conf;
            this.topic = topic;
            this.primaryKeyIndex = primaryKeyIndex;
            this.producerProperties = producerProperties;
            this.fields = fields;
        }

        public ITridentDataSource getProducer() {
            return new OpaqueTridentKafkaSpout(this.conf);
        }

        public Function getConsumer() {
            return new KafkaTridentSink(this.topic, this.primaryKeyIndex, this.producerProperties, this.fields);
        }
    }

    static class KafkaTridentSink
    extends BaseFunction {
        private transient TridentKafkaState state;
        private final String topic;
        private final int primaryKeyIndex;
        private final Properties producerProperties;
        private final List<String> fieldNames;

        private KafkaTridentSink(String topic, int primaryKeyIndex, Properties producerProperties, List<String> fieldNames) {
            this.topic = topic;
            this.primaryKeyIndex = primaryKeyIndex;
            this.producerProperties = producerProperties;
            this.fieldNames = fieldNames;
        }

        public void cleanup() {
            super.cleanup();
        }

        public void prepare(Map conf, TridentOperationContext context) {
            JsonSerializer serializer = new JsonSerializer(this.fieldNames);
            SqlKafkaMapper m = new SqlKafkaMapper(this.primaryKeyIndex, serializer);
            this.state = new TridentKafkaState().withKafkaTopicSelector((KafkaTopicSelector)new StaticTopicSelector(this.topic)).withTridentTupleToKafkaMapper((TridentTupleToKafkaMapper)m);
            this.state.prepare(this.producerProperties);
        }

        public void execute(TridentTuple tuple, TridentCollector collector) {
            this.state.updateState(Collections.singletonList(tuple), collector);
        }
    }

    private static class SqlKafkaMapper
    implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
        private final int primaryKeyIndex;
        private final IOutputSerializer serializer;

        private SqlKafkaMapper(int primaryKeyIndex, IOutputSerializer serializer) {
            this.primaryKeyIndex = primaryKeyIndex;
            this.serializer = serializer;
        }

        public Object getKeyFromTuple(TridentTuple tuple) {
            return tuple.get(this.primaryKeyIndex);
        }

        public ByteBuffer getMessageFromTuple(TridentTuple tuple) {
            return this.serializer.write(tuple.getValues(), null);
        }
    }

    private static class StaticTopicSelector
    implements KafkaTopicSelector {
        private final String topic;

        private StaticTopicSelector(String topic) {
            this.topic = topic;
        }

        public String getTopic(TridentTuple tuple) {
            return this.topic;
        }
    }
}

