package ly.bit.nsq;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import ly.bit.nsq.exceptions.NSQException;
import ly.bit.nsq.util.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ly/bit/nsq/BasicConnection.class */
public class BasicConnection extends Connection {
    private static final Logger log = LoggerFactory.getLogger(BasicConnection.class);
    private Socket sock;
    private InputStream inputStream;

    @Override // ly.bit.nsq.Connection
    public void init(String str, int i, NSQReader nSQReader) {
        this.host = str;
        this.reader = nSQReader;
        this.port = i;
        this.sock = new Socket();
    }

    @Override // ly.bit.nsq.Connection
    public synchronized void send(String str) throws NSQException {
        try {
            this.sock.getOutputStream().write(str.getBytes());
        } catch (IOException e) {
            throw new NSQException(e);
        }
    }

    public byte[] readN(int i) throws IOException {
        byte[] bArr = new byte[i];
        this.inputStream.read(bArr);
        return bArr;
    }

    public byte[] readResponse() throws NSQException {
        try {
            return readN(new DataInputStream(this.inputStream).readInt());
        } catch (IOException e) {
            throw new NSQException(e);
        }
    }

    @Override // ly.bit.nsq.Connection
    public void connect() throws NSQException {
        try {
            this.sock.connect(new InetSocketAddress(this.host, this.port));
            send(ConnectionUtils.MAGIC_V2);
            this.inputStream = new BufferedInputStream(this.sock.getInputStream());
        } catch (IOException e) {
            throw new NSQException(e);
        }
    }

    @Override // ly.bit.nsq.Connection
    public void readForever() {
        new Thread(new Runnable() { // from class: ly.bit.nsq.BasicConnection.1ReadThis
            @Override // java.lang.Runnable
            public void run() {
                while (!BasicConnection.this.closed.get()) {
                    try {
                        try {
                            BasicConnection.this.handleResponse(BasicConnection.this.readResponse());
                        } catch (NSQException e) {
                            BasicConnection.log.error("Message error: ", e);
                        }
                    } catch (NSQException e2) {
                        BasicConnection.this.close();
                        return;
                    }
                }
            }
        }, toString()).start();
    }

    @Override // ly.bit.nsq.Connection
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        log.info("Closing connection {}", toString());
        try {
            this.sock.close();
        } catch (IOException e) {
            log.error("Exception closing connection: ", e);
        }
        this.reader.connections.remove(toString());
    }
}
