package org.apache.storm.sql.runtime.datasource.socket.spout;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.storm.Config;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.class */
public class SocketSpout implements IRichSpout {
    private static final Logger LOG = LoggerFactory.getLogger(SocketSpout.class);
    private final String host;
    private final int port;
    private final Scheme scheme;
    private volatile boolean running;
    private BlockingDeque<List<Object>> queue;
    private Socket socket;
    private Thread readerThread;
    private BufferedReader in;
    private ObjectMapper objectMapper;
    private SpoutOutputCollector collector;
    private Map<String, List<Object>> emitted;

    /* loaded from: input_file:org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout$SocketReaderRunnable.class */
    private class SocketReaderRunnable implements Runnable {
        private SocketReaderRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            String readLine;
            while (SocketSpout.this.running) {
                try {
                    readLine = SocketSpout.this.in.readLine();
                } catch (Throwable th) {
                    SocketSpout.this.die(th);
                }
                if (readLine == null) {
                    throw new RuntimeException("EOF reached from the socket. We can't read the data any more.");
                    break;
                } else {
                    SocketSpout.this.queue.push(SocketSpout.this.convertLineToTuple(readLine.trim()));
                }
            }
        }
    }

    public SocketSpout(Scheme scheme, String str, int i) {
        this.scheme = scheme;
        this.host = str;
        this.port = i;
    }

    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        this.queue = new LinkedBlockingDeque();
        this.emitted = new HashMap();
        this.objectMapper = new ObjectMapper();
        try {
            this.socket = new Socket(this.host, this.port);
            this.in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            this.readerThread = new Thread(new SocketReaderRunnable());
            this.readerThread.start();
        } catch (IOException e) {
            throw new RuntimeException("Error opening socket: host " + this.host + " port " + this.port);
        }
    }

    public void close() {
        this.running = false;
        this.readerThread.interrupt();
        this.queue.clear();
        closeQuietly(this.in);
        closeQuietly(this.socket);
    }

    public void activate() {
        this.running = true;
    }

    public void deactivate() {
        this.running = false;
    }

    public void nextTuple() {
        List<Object> poll;
        if (this.queue.peek() == null || (poll = this.queue.poll()) == null) {
            return;
        }
        String uuid = UUID.randomUUID().toString();
        this.emitted.put(uuid, poll);
        this.collector.emit(poll, uuid);
    }

    private List<Object> convertLineToTuple(String str) {
        return this.scheme.deserialize(ByteBuffer.wrap(str.getBytes()));
    }

    public void ack(Object obj) {
        this.emitted.remove(obj);
    }

    public void fail(Object obj) {
        List<Object> remove = this.emitted.remove(obj);
        if (remove != null) {
            this.queue.addLast(remove);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(this.scheme.getOutputFields());
    }

    public Map<String, Object> getComponentConfiguration() {
        Config config = new Config();
        config.setMaxTaskParallelism(1);
        return config;
    }

    private void die(Throwable th) {
        LOG.error("Halting process: TridentSocketSpout died.", th);
        if (this.running || (th instanceof Error)) {
            System.exit(11);
        }
    }

    private void closeQuietly(Closeable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
            } catch (IOException e) {
            }
        }
    }
}
