package org.apache.htrace.zipkin;

import com.twitter.zipkin.gen.Span;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.TestZKUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.core.Tracer;
import org.apache.htrace.core.TracerPool;
import org.apache.htrace.impl.KafkaTransport;
import org.apache.htrace.impl.ZipkinSpanReceiver;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.junit.Assert;
import org.junit.Test;
import scala.collection.JavaConversions;
import scala.collection.mutable.Buffer;

/* loaded from: input_file:org/apache/htrace/zipkin/ITZipkinReceiver.class */
public class ITZipkinReceiver {
    @Test
    public void testKafkaTransport() throws Exception {
        ZkClient zkClient = new ZkClient(new EmbeddedZookeeper(TestZKUtils.zookeeperConnect()).connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
        Properties createBrokerConfig = TestUtils.createBrokerConfig(0, TestUtils.choosePort(), false);
        KafkaConfig kafkaConfig = new KafkaConfig(createBrokerConfig);
        KafkaServer createServer = TestUtils.createServer(kafkaConfig, new MockTime());
        Buffer asScalaBuffer = JavaConversions.asScalaBuffer(Collections.singletonList(createServer));
        TestUtils.createTopic(zkClient, "zipkin", 1, 1, asScalaBuffer, new Properties());
        zkClient.close();
        TestUtils.waitUntilMetadataIsPropagated(asScalaBuffer, "zipkin", 0, 5000L);
        Tracer build = new Tracer.Builder("test-tracer").tracerPool(new TracerPool("test-tracer-pool")).conf(HTraceConfiguration.fromKeyValuePairs(new String[]{"sampler.classes", "AlwaysSampler", "span.receiver.classes", ZipkinSpanReceiver.class.getName(), "zipkin.kafka.metadata.broker.list", kafkaConfig.advertisedHostName() + ":" + kafkaConfig.advertisedPort(), "zipkin.kafka.topic", "zipkin", "zipkin.transport.class", KafkaTransport.class.getName()})).build();
        build.newScope("test-kafka-transport-scope").close();
        build.close();
        Properties properties = new Properties();
        properties.put("zookeeper.connect", createBrokerConfig.getProperty("zookeeper.connect"));
        properties.put("group.id", "testing.group");
        properties.put("auto.offset.reset", "smallest");
        ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        HashMap hashMap = new HashMap();
        hashMap.put("zipkin", 1);
        ConsumerIterator it = ((KafkaStream) ((List) createJavaConsumerConnector.createMessageStreams(hashMap).get("zipkin")).get(0)).iterator();
        Assert.assertTrue("We should have one message in Kafka", it.hasNext());
        Span span = new Span();
        new TDeserializer(new TBinaryProtocol.Factory()).deserialize(span, (byte[]) it.next().message());
        Assert.assertEquals("The span name should match our scope description", span.getName(), "test-kafka-transport-scope");
        createServer.shutdown();
    }
}
