package org.apache.spark.streaming.kafka;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;

/* loaded from: input_file:org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.class */
public class JavaDirectKafkaStreamSuite implements Serializable {
    private transient JavaStreamingContext ssc = null;
    private transient KafkaTestUtils kafkaTestUtils = null;

    @Before
    public void setUp() {
        this.kafkaTestUtils = new KafkaTestUtils();
        this.kafkaTestUtils.setup();
        this.ssc = new JavaStreamingContext(new SparkConf().setMaster("local[4]").setAppName(getClass().getSimpleName()), Durations.milliseconds(200L));
    }

    @After
    public void tearDown() {
        if (this.ssc != null) {
            this.ssc.stop();
            this.ssc = null;
        }
        if (this.kafkaTestUtils != null) {
            this.kafkaTestUtils.teardown();
            this.kafkaTestUtils = null;
        }
    }

    @Test
    public void testKafkaStream() throws InterruptedException {
        String[] createTopicAndSendData = createTopicAndSendData("topic1");
        String[] createTopicAndSendData2 = createTopicAndSendData("topic2");
        HashSet hashSet = new HashSet();
        hashSet.addAll(Arrays.asList(createTopicAndSendData));
        hashSet.addAll(Arrays.asList(createTopicAndSendData2));
        HashMap hashMap = new HashMap();
        hashMap.put("metadata.broker.list", this.kafkaTestUtils.brokerAddress());
        hashMap.put("auto.offset.reset", "smallest");
        JavaDStream union = KafkaUtils.createDirectStream(this.ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, hashMap, topicToSet("topic1")).map(new Function<Tuple2<String, String>, String>() { // from class: org.apache.spark.streaming.kafka.JavaDirectKafkaStreamSuite.1
            public String call(Tuple2<String, String> tuple2) throws Exception {
                return (String) tuple2._2();
            }
        }).union(KafkaUtils.createDirectStream(this.ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, hashMap, topicOffsetToMap("topic2", 0L), new Function<MessageAndMetadata<String, String>, String>() { // from class: org.apache.spark.streaming.kafka.JavaDirectKafkaStreamSuite.2
            public String call(MessageAndMetadata<String, String> messageAndMetadata) throws Exception {
                return (String) messageAndMetadata.message();
            }
        }));
        final HashSet hashSet2 = new HashSet();
        union.foreachRDD(new Function<JavaRDD<String>, Void>() { // from class: org.apache.spark.streaming.kafka.JavaDirectKafkaStreamSuite.3
            public Void call(JavaRDD<String> javaRDD) throws Exception {
                hashSet2.addAll(javaRDD.collect());
                return null;
            }
        });
        this.ssc.start();
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = false;
        while (!z && System.currentTimeMillis() - currentTimeMillis < 20000) {
            z = hashSet.size() == hashSet2.size();
            Thread.sleep(50L);
        }
        Assert.assertEquals(hashSet, hashSet2);
        this.ssc.stop();
    }

    private HashSet<String> topicToSet(String str) {
        HashSet<String> hashSet = new HashSet<>();
        hashSet.add(str);
        return hashSet;
    }

    private HashMap<TopicAndPartition, Long> topicOffsetToMap(String str, Long l) {
        HashMap<TopicAndPartition, Long> hashMap = new HashMap<>();
        hashMap.put(new TopicAndPartition(str, 0), l);
        return hashMap;
    }

    private String[] createTopicAndSendData(String str) {
        String[] strArr = {str + "-1", str + "-2", str + "-3"};
        this.kafkaTestUtils.createTopic(str);
        this.kafkaTestUtils.sendMessages(str, strArr);
        return strArr;
    }
}
