package org.apache.spark.streaming.kafka;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Random;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;

/* loaded from: input_file:org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.class */
public class JavaKafkaStreamSuite implements Serializable {
    private transient JavaStreamingContext ssc = null;
    private transient Random random = new Random();
    private transient KafkaTestUtils kafkaTestUtils = null;

    @Before
    public void setUp() {
        this.kafkaTestUtils = new KafkaTestUtils();
        this.kafkaTestUtils.setup();
        this.ssc = new JavaStreamingContext(new SparkConf().setMaster("local[4]").setAppName(getClass().getSimpleName()), new Duration(500L));
    }

    @After
    public void tearDown() {
        if (this.ssc != null) {
            this.ssc.stop();
            this.ssc = null;
        }
        if (this.kafkaTestUtils != null) {
            this.kafkaTestUtils.teardown();
            this.kafkaTestUtils = null;
        }
    }

    @Test
    public void testKafkaStream() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("topic1", 1);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("a", 5);
        hashMap2.put("b", 3);
        hashMap2.put("c", 10);
        this.kafkaTestUtils.createTopic("topic1");
        this.kafkaTestUtils.sendMessages("topic1", hashMap2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("zookeeper.connect", this.kafkaTestUtils.zkAddress());
        hashMap3.put("group.id", "test-consumer-" + this.random.nextInt(10000));
        hashMap3.put("auto.offset.reset", "smallest");
        JavaPairReceiverInputDStream createStream = KafkaUtils.createStream(this.ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, hashMap3, hashMap, StorageLevel.MEMORY_ONLY_SER());
        final HashMap hashMap4 = new HashMap();
        createStream.map(new Function<Tuple2<String, String>, String>() { // from class: org.apache.spark.streaming.kafka.JavaKafkaStreamSuite.1
            public String call(Tuple2<String, String> tuple2) throws Exception {
                return (String) tuple2._2();
            }
        }).countByValue().foreachRDD(new Function<JavaPairRDD<String, Long>, Void>() { // from class: org.apache.spark.streaming.kafka.JavaKafkaStreamSuite.2
            public Void call(JavaPairRDD<String, Long> javaPairRDD) throws Exception {
                for (Tuple2 tuple2 : javaPairRDD.collect()) {
                    if (hashMap4.containsKey(tuple2._1())) {
                        hashMap4.put(tuple2._1(), Long.valueOf(((Long) hashMap4.get(tuple2._1())).longValue() + ((Long) tuple2._2()).longValue()));
                    } else {
                        hashMap4.put(tuple2._1(), tuple2._2());
                    }
                }
                return null;
            }
        });
        this.ssc.start();
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = false;
        while (!z && System.currentTimeMillis() - currentTimeMillis < 20000) {
            z = hashMap2.size() == hashMap4.size();
            Thread.sleep(200L);
        }
        Assert.assertEquals(hashMap2.size(), hashMap4.size());
        for (String str : hashMap2.keySet()) {
            Assert.assertEquals(((Integer) hashMap2.get(str)).intValue(), ((Long) hashMap4.get(str)).intValue());
        }
    }
}
