package org.apache.flink.streaming.api.functions.sink;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.class */
public class SocketClientSinkTest extends TestLogger {
    private static final String TEST_MESSAGE = "testSocketSinkInvoke";
    private static final String EXCEPTION_MESSGAE = "Failed to send message 'testSocketSinkInvoke\n'";
    private static final String host = "127.0.0.1";
    private SerializationSchema<String> simpleSchema = new SerializationSchema<String>() { // from class: org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.1
        public byte[] serialize(String str) {
            return str.getBytes(ConfigConstants.DEFAULT_CHARSET);
        }
    };

    @Test
    public void testSocketSink() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        final int localPort = serverSocket.getLocalPort();
        final AtomicReference atomicReference = new AtomicReference();
        Thread thread = new Thread("Test sink runner") { // from class: org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    SocketClientSink socketClientSink = new SocketClientSink(SocketClientSinkTest.host, localPort, SocketClientSinkTest.this.simpleSchema, 0);
                    socketClientSink.open(new Configuration());
                    socketClientSink.invoke("testSocketSinkInvoke\n", SinkContextUtil.forTimestamp(0L));
                    socketClientSink.close();
                } catch (Throwable th) {
                    atomicReference.set(th);
                }
            }
        };
        thread.start();
        String readLine = new BufferedReader(new InputStreamReader(serverSocket.accept().getInputStream())).readLine();
        thread.join();
        serverSocket.close();
        if (atomicReference.get() != null) {
            Throwable th = (Throwable) atomicReference.get();
            th.printStackTrace();
            Assert.fail("Error in spawned thread: " + th.getMessage());
        }
        Assert.assertEquals(TEST_MESSAGE, readLine);
    }

    @Test
    public void testSinkAutoFlush() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        final SocketClientSink socketClientSink = new SocketClientSink(host, serverSocket.getLocalPort(), this.simpleSchema, 0, true);
        socketClientSink.open(new Configuration());
        final AtomicReference atomicReference = new AtomicReference();
        Thread thread = new Thread("Test sink runner") { // from class: org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    socketClientSink.invoke("testSocketSinkInvoke\n", SinkContextUtil.forTimestamp(0L));
                } catch (Throwable th) {
                    atomicReference.set(th);
                }
            }
        };
        thread.start();
        String readLine = new BufferedReader(new InputStreamReader(serverSocket.accept().getInputStream())).readLine();
        thread.join();
        socketClientSink.close();
        serverSocket.close();
        if (atomicReference.get() != null) {
            Throwable th = (Throwable) atomicReference.get();
            th.printStackTrace();
            Assert.fail("Error in spawned thread: " + th.getMessage());
        }
        Assert.assertEquals(TEST_MESSAGE, readLine);
    }

    @Test
    public void testSocketSinkNoRetry() throws Exception {
        final ServerSocket serverSocket = new ServerSocket(0);
        int localPort = serverSocket.getLocalPort();
        try {
            final AtomicReference atomicReference = new AtomicReference();
            Thread thread = new Thread("Test server runner") { // from class: org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.4
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        serverSocket.accept().close();
                    } catch (Throwable th) {
                        atomicReference.set(th);
                    }
                }
            };
            thread.start();
            SocketClientSink socketClientSink = new SocketClientSink(host, localPort, this.simpleSchema, 0, true);
            socketClientSink.open(new Configuration());
            thread.join();
            if (atomicReference.get() != null) {
                Throwable th = (Throwable) atomicReference.get();
                th.printStackTrace();
                Assert.fail("Error in server thread: " + th.getMessage());
            }
            while (true) {
                try {
                    socketClientSink.invoke("testSocketSinkInvoke\n", SinkContextUtil.forTimestamp(0L));
                } catch (IOException e) {
                    Assert.assertTrue("Wrong exception", e.getMessage().contains(EXCEPTION_MESSGAE));
                    Assert.assertEquals(0L, socketClientSink.getCurrentNumberOfRetries());
                    IOUtils.closeQuietly(serverSocket);
                    return;
                } catch (Exception e2) {
                    Assert.fail("wrong exception: " + e2.getClass().getName() + " - " + e2.getMessage());
                    Assert.assertEquals(0L, socketClientSink.getCurrentNumberOfRetries());
                    IOUtils.closeQuietly(serverSocket);
                    return;
                }
            }
        } catch (Throwable th2) {
            IOUtils.closeQuietly(serverSocket);
            throw th2;
        }
    }

    @Test
    public void testRetry() throws Exception {
        final ServerSocket[] serverSocketArr = new ServerSocket[1];
        ExecutorService[] executorServiceArr = new ExecutorService[1];
        try {
            serverSocketArr[0] = new ServerSocket(0);
            executorServiceArr[0] = Executors.newCachedThreadPool();
            int localPort = serverSocketArr[0].getLocalPort();
            Future submit = executorServiceArr[0].submit(new Callable<Void>() { // from class: org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.5
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    Socket accept = serverSocketArr[0].accept();
                    Assert.assertEquals("0", new BufferedReader(new InputStreamReader(accept.getInputStream())).readLine());
                    accept.close();
                    return null;
                }
            });
            final SocketClientSink socketClientSink = new SocketClientSink(host, serverSocketArr[0].getLocalPort(), this.simpleSchema, -1, true);
            socketClientSink.open(new Configuration());
            socketClientSink.invoke("0\n", SinkContextUtil.forTimestamp(0L));
            submit.get();
            serverSocketArr[0].close();
            Assert.assertTrue(serverSocketArr[0].isClosed());
            Assert.assertEquals(0L, socketClientSink.getCurrentNumberOfRetries());
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            new CountDownLatch(1);
            executorServiceArr[0].submit(new Callable<Void>() { // from class: org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.6
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    while (countDownLatch.getCount() != 0) {
                        socketClientSink.invoke("1\n");
                    }
                    return null;
                }
            });
            while (socketClientSink.getCurrentNumberOfRetries() == 0) {
                Thread.sleep(100L);
            }
            countDownLatch.countDown();
            serverSocketArr[0] = new ServerSocket(localPort);
            Assert.assertEquals("1", new BufferedReader(new InputStreamReader(serverSocketArr[0].accept().getInputStream())).readLine());
            if (serverSocketArr[0] != null) {
                serverSocketArr[0].close();
            }
            if (executorServiceArr[0] != null) {
                executorServiceArr[0].shutdown();
            }
        } catch (Throwable th) {
            if (serverSocketArr[0] != null) {
                serverSocketArr[0].close();
            }
            if (executorServiceArr[0] != null) {
                executorServiceArr[0].shutdown();
            }
            throw th;
        }
    }
}
