package org.apache.flink.streaming.api.functions.sink;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.util.serialization.SerializationSchema;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/SocketClientSink.class */
public class SocketClientSink<IN> extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1;
    private final String hostName;
    private final int port;
    private final SerializationSchema<IN, byte[]> schema;
    private transient Socket client;
    private transient DataOutputStream dataOutputStream;

    public SocketClientSink(String str, int i, SerializationSchema<IN, byte[]> serializationSchema) {
        this.hostName = str;
        this.port = i;
        this.schema = serializationSchema;
    }

    public void intializeConnection() {
        try {
            this.client = new Socket(this.hostName, this.port);
            this.dataOutputStream = new DataOutputStream(this.client.getOutputStream());
        } catch (IOException e) {
            throw new RuntimeException("Cannot initialize connection to socket server at " + this.hostName + ":" + this.port, e);
        }
    }

    @Override // org.apache.flink.streaming.api.functions.sink.RichSinkFunction, org.apache.flink.streaming.api.functions.sink.SinkFunction
    public void invoke(IN in) {
        try {
            this.dataOutputStream.write(this.schema.serialize(in));
        } catch (IOException e) {
            throw new RuntimeException("Cannot send message " + in.toString() + " to socket server at " + this.hostName + ":" + this.port, e);
        }
    }

    private void closeConnection() {
        try {
            try {
                this.dataOutputStream.flush();
                this.client.close();
                if (this.client != null) {
                    try {
                        this.client.close();
                    } catch (IOException e) {
                        throw new RuntimeException("Cannot close connection with socket server at " + this.hostName + ":" + this.port, e);
                    }
                }
            } catch (Throwable th) {
                if (this.client != null) {
                    try {
                        this.client.close();
                    } catch (IOException e2) {
                        throw new RuntimeException("Cannot close connection with socket server at " + this.hostName + ":" + this.port, e2);
                    }
                }
                throw th;
            }
        } catch (IOException e3) {
            throw new RuntimeException("Error while closing connection with socket server at " + this.hostName + ":" + this.port, e3);
        }
    }

    public void open(Configuration configuration) {
        intializeConnection();
    }

    public void close() {
        closeConnection();
    }
}
