package org.apache.eagle.dataproc.impl.storm.kafka;

import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.base.BaseRichSpout;
import com.typesafe.config.Config;
import java.util.Arrays;
import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
import org.apache.eagle.notification.base.NotificationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;

/* loaded from: input_file:org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.class */
public class KafkaSourcedSpoutProvider implements StormSpoutProvider {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class);
    private String configPrefix;

    public SchemeAsMultiScheme getStreamScheme(String str, Config config) {
        return new SchemeAsMultiScheme(new KafkaSourcedSpoutScheme(str, config));
    }

    public KafkaSourcedSpoutProvider() {
        this.configPrefix = "dataSourceConfig";
    }

    public KafkaSourcedSpoutProvider(String str) {
        this.configPrefix = "dataSourceConfig";
        this.configPrefix = str;
    }

    @Override // org.apache.eagle.dataproc.impl.storm.StormSpoutProvider
    public BaseRichSpout getSpout(Config config) {
        Config config2 = config;
        if (this.configPrefix != null) {
            config2 = config.getConfig(this.configPrefix);
        }
        String string = config2.getString(NotificationConstants.TOPIC);
        String string2 = config2.getString("consumerGroupId");
        int i = config2.getInt("fetchSize");
        String string3 = config2.getString("deserializerClass");
        String string4 = config2.getString("zkConnection");
        String string5 = config2.getString("transactionZKRoot");
        LOG.info(String.format("Use topic id: %s", string));
        String str = null;
        if (config2.hasPath("brokerZkPath")) {
            str = config2.getString("brokerZkPath");
        }
        SpoutConfig spoutConfig = new SpoutConfig(str == null ? new ZkHosts(string4) : new ZkHosts(string4, str), string, string5 + "/" + string, string2);
        spoutConfig.zkServers = Arrays.asList(config2.getString("transactionZKServers").split(","));
        spoutConfig.zkPort = Integer.valueOf(config2.getInt("transactionZKPort"));
        spoutConfig.stateUpdateIntervalMs = config2.getLong("transactionStateUpdateMS");
        spoutConfig.fetchSizeBytes = i;
        if (config2.hasPath("startOffsetTime")) {
            spoutConfig.startOffsetTime = config2.getInt("startOffsetTime");
        }
        if (config2.hasPath("forceFromStart")) {
            spoutConfig.forceFromStart = config2.getBoolean("forceFromStart");
        }
        spoutConfig.scheme = getStreamScheme(string3, config2);
        return new KafkaSpout(spoutConfig);
    }
}
