/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.sql.runtime.datasource.socket.spout;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.storm.Config;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketSpout
implements IRichSpout {
    private static final Logger LOG = LoggerFactory.getLogger(SocketSpout.class);
    private final String host;
    private final int port;
    private final Scheme scheme;
    private volatile boolean running;
    private BlockingDeque<List<Object>> queue;
    private Socket socket;
    private Thread readerThread;
    private BufferedReader in;
    private ObjectMapper objectMapper;
    private SpoutOutputCollector collector;
    private Map<String, List<Object>> emitted;

    public SocketSpout(Scheme scheme, String host, int port) {
        this.scheme = scheme;
        this.host = host;
        this.port = port;
    }

    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.queue = new LinkedBlockingDeque<List<Object>>();
        this.emitted = new HashMap<String, List<Object>>();
        this.objectMapper = new ObjectMapper();
        try {
            this.socket = new Socket(this.host, this.port);
            this.in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
        }
        catch (IOException e) {
            throw new RuntimeException("Error opening socket: host " + this.host + " port " + this.port);
        }
        this.readerThread = new Thread(new SocketReaderRunnable());
        this.readerThread.start();
    }

    public void close() {
        this.running = false;
        this.readerThread.interrupt();
        this.queue.clear();
        this.closeQuietly(this.in);
        this.closeQuietly(this.socket);
    }

    public void activate() {
        this.running = true;
    }

    public void deactivate() {
        this.running = false;
    }

    public void nextTuple() {
        List<Object> values;
        if (this.queue.peek() != null && (values = this.queue.poll()) != null) {
            String id = UUID.randomUUID().toString();
            this.emitted.put(id, values);
            this.collector.emit(values, (Object)id);
        }
    }

    private List<Object> convertLineToTuple(String line) {
        return this.scheme.deserialize(ByteBuffer.wrap(line.getBytes()));
    }

    public void ack(Object msgId) {
        this.emitted.remove(msgId);
    }

    public void fail(Object msgId) {
        List<Object> emittedValues = this.emitted.remove(msgId);
        if (emittedValues != null) {
            this.queue.addLast(emittedValues);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(this.scheme.getOutputFields());
    }

    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        conf.setMaxTaskParallelism(1);
        return conf;
    }

    private void die(Throwable t) {
        LOG.error("Halting process: TridentSocketSpout died.", t);
        if (this.running || t instanceof Error) {
            System.exit(11);
        }
    }

    private void closeQuietly(Closeable closeable) {
        try {
            if (closeable != null) {
                closeable.close();
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    private class SocketReaderRunnable
    implements Runnable {
        private SocketReaderRunnable() {
        }

        @Override
        public void run() {
            while (SocketSpout.this.running) {
                try {
                    String line = SocketSpout.this.in.readLine();
                    if (line == null) {
                        throw new RuntimeException("EOF reached from the socket. We can't read the data any more.");
                    }
                    List values = SocketSpout.this.convertLineToTuple(line.trim());
                    SocketSpout.this.queue.push(values);
                }
                catch (Throwable t) {
                    SocketSpout.this.die(t);
                }
            }
        }
    }
}

