/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaTestUtils;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;

public class JavaDirectKafkaStreamSuite
implements Serializable {
    private transient JavaStreamingContext ssc = null;
    private transient KafkaTestUtils kafkaTestUtils = null;

    @Before
    public void setUp() {
        this.kafkaTestUtils = new KafkaTestUtils();
        this.kafkaTestUtils.setup();
        SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass().getSimpleName());
        this.ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds((long)200L));
    }

    @After
    public void tearDown() {
        if (this.ssc != null) {
            this.ssc.stop();
            this.ssc = null;
        }
        if (this.kafkaTestUtils != null) {
            this.kafkaTestUtils.teardown();
            this.kafkaTestUtils = null;
        }
    }

    @Test
    public void testKafkaStream() throws InterruptedException {
        String topic1 = "topic1";
        String topic2 = "topic2";
        final AtomicReference offsetRanges = new AtomicReference();
        String[] topic1data = this.createTopicAndSendData("topic1");
        String[] topic2data = this.createTopicAndSendData("topic2");
        HashSet<String> sent = new HashSet<String>();
        sent.addAll(Arrays.asList(topic1data));
        sent.addAll(Arrays.asList(topic2data));
        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", this.kafkaTestUtils.brokerAddress());
        kafkaParams.put("auto.offset.reset", "smallest");
        JavaDStream stream1 = KafkaUtils.createDirectStream((JavaStreamingContext)this.ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, JavaDirectKafkaStreamSuite.topicToSet("topic1")).transformToPair((Function)new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>(){

            public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
                OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
                offsetRanges.set(offsets);
                Assert.assertEquals((Object)"topic1", (Object)offsets[0].topic());
                return rdd;
            }
        }).map((Function)new Function<Tuple2<String, String>, String>(){

            public String call(Tuple2<String, String> kv) {
                return (String)kv._2();
            }
        });
        JavaInputDStream stream2 = KafkaUtils.createDirectStream((JavaStreamingContext)this.ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, JavaDirectKafkaStreamSuite.topicOffsetToMap("topic2", 0L), (Function)new Function<MessageAndMetadata<String, String>, String>(){

            public String call(MessageAndMetadata<String, String> msgAndMd) {
                return (String)msgAndMd.message();
            }
        });
        JavaDStream unifiedStream = stream1.union((JavaDStream)stream2);
        final Set result = Collections.synchronizedSet(new HashSet());
        unifiedStream.foreachRDD((VoidFunction)new VoidFunction<JavaRDD<String>>(){

            public void call(JavaRDD<String> rdd) {
                result.addAll(rdd.collect());
            }
        });
        this.ssc.start();
        long startTime = System.currentTimeMillis();
        boolean matches = false;
        while (!matches && System.currentTimeMillis() - startTime < 20000L) {
            matches = sent.size() == result.size();
            Thread.sleep(50L);
        }
        Assert.assertEquals(sent, result);
        this.ssc.stop();
    }

    private static Set<String> topicToSet(String topic) {
        HashSet<String> topicSet = new HashSet<String>();
        topicSet.add(topic);
        return topicSet;
    }

    private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
        HashMap<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>();
        topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
        return topicMap;
    }

    private String[] createTopicAndSendData(String topic) {
        String[] data = new String[]{topic + "-1", topic + "-2", topic + "-3"};
        this.kafkaTestUtils.createTopic(topic, 1);
        this.kafkaTestUtils.sendMessages(topic, data);
        return data;
    }
}

