package org.apache.spark.streaming;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/spark/streaming/JavaReceiverAPISuite.class */
public class JavaReceiverAPISuite implements Serializable {
    @Before
    public void setUp() {
        System.clearProperty("spark.streaming.clock");
    }

    @After
    public void tearDown() {
        System.clearProperty("spark.streaming.clock");
    }

    @Test
    public void testReceiver() throws InterruptedException {
        TestServer testServer = new TestServer(0);
        testServer.start();
        final AtomicLong atomicLong = new AtomicLong(0L);
        try {
            JavaStreamingContext javaStreamingContext = new JavaStreamingContext("local[2]", "test", new Duration(200L));
            javaStreamingContext.receiverStream(new JavaSocketReceiver("localhost", testServer.port())).map(new Function<String, String>() { // from class: org.apache.spark.streaming.JavaReceiverAPISuite.1
                public String call(String str) throws Exception {
                    return str + ".";
                }
            }).foreachRDD(new Function<JavaRDD<String>, Void>() { // from class: org.apache.spark.streaming.JavaReceiverAPISuite.2
                public Void call(JavaRDD<String> javaRDD) throws Exception {
                    atomicLong.addAndGet(javaRDD.count());
                    return null;
                }
            });
            javaStreamingContext.start();
            long currentTimeMillis = System.currentTimeMillis();
            Thread.sleep(200L);
            for (int i = 0; i < 6; i++) {
                testServer.send("" + i + "\n");
                Thread.sleep(100L);
            }
            while (atomicLong.get() == 0 && System.currentTimeMillis() - currentTimeMillis < 10000) {
                Thread.sleep(100L);
            }
            javaStreamingContext.stop();
            Assert.assertTrue(atomicLong.get() > 0);
            testServer.stop();
        } catch (Throwable th) {
            testServer.stop();
            throw th;
        }
    }
}
