package com.datatorrent.contrib.splunk;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/* loaded from: input_file:com/datatorrent/contrib/splunk/SplunkInputFromForwarder.class */
public abstract class SplunkInputFromForwarder<T> {
    protected Producer<String, T> producer;
    private String topic;
    protected ServerSocket serverSocket;
    protected Socket connectionSocket;
    private final int DEFAULT_PORT = 6789;
    private Properties configProperties = new Properties();
    private int port = 6789;

    public int getPort() {
        return this.port;
    }

    public void setPort(int i) {
        this.port = i;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public Properties getConfigProperties() {
        return this.configProperties;
    }

    public void setConfigProperties(Properties properties) {
        this.configProperties = properties;
    }

    public abstract T getMessage(String str);

    public void writeToKafka(String str) {
        T t = null;
        if (str != null) {
            t = getMessage(str);
        }
        if (t != null) {
            this.producer.send(new KeyedMessage(getTopic(), t));
        }
    }

    public void startServer() throws IOException {
        this.producer = new Producer<>(new ProducerConfig(this.configProperties));
        this.serverSocket = new ServerSocket(this.port);
    }

    public void process() throws IOException {
        this.connectionSocket = this.serverSocket.accept();
        while (true) {
            writeToKafka(new BufferedReader(new InputStreamReader(this.connectionSocket.getInputStream())).readLine());
        }
    }

    public void stopServer() throws IOException {
        this.serverSocket.close();
        this.connectionSocket.close();
        this.producer.close();
    }
}
