package uk.camsw.rx.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;

/* loaded from: input_file:uk/camsw/rx/kafka/HighLevelKafkaStream.class */
public class HighLevelKafkaStream {
    private static final Logger logger = LoggerFactory.getLogger(HighLevelKafkaStream.class);

    public static Observable<MessageAndMetadata<byte[], byte[]>> create(String str, ConsumerConfig consumerConfig) {
        return Observable.create(subscriber -> {
            logger.debug("Subscribing to: [{}]", str);
            ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
            HashMap hashMap = new HashMap();
            hashMap.put(str, 1);
            List list = (List) createJavaConsumerConnector.createMessageStreams(hashMap).get(str);
            logger.debug("Retrieved partition streams.  Size: [{}]", Integer.valueOf(list.size()));
            list.forEach(kafkaStream -> {
                Scheduler from = Schedulers.from(Executors.newSingleThreadExecutor());
                ConsumerIterator it = kafkaStream.iterator();
                subscriber.add(from.createWorker().schedule(() -> {
                    while (it.hasNext()) {
                        subscriber.onNext(it.next());
                    }
                }));
            });
            try {
                Thread.sleep(300L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            createJavaConsumerConnector.getClass();
            subscriber.add(Subscriptions.create(createJavaConsumerConnector::shutdown));
        }).doOnUnsubscribe(() -> {
            logger.debug("Unsubscribed from topic: [{}]", str);
        }).publish().refCount();
    }
}
