package org.apache.streams.twitter.provider;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/twitter/provider/TwitterStreamProcessor.class */
public class TwitterStreamProcessor extends StringDelimitedProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProcessor.class);
    private static final int DEFAULT_POOL_SIZE = 5;
    private final TwitterStreamProvider provider;
    private final ExecutorService service;

    /* loaded from: input_file:org/apache/streams/twitter/provider/TwitterStreamProcessor$StreamDeserializer.class */
    protected static class StreamDeserializer implements Callable<List<StreamsDatum>> {
        protected static final ObjectMapper mapper = StreamsTwitterMapper.getInstance();
        protected String item;

        public StreamDeserializer(String str) {
            this.item = str;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public List<StreamsDatum> call() throws Exception {
            return this.item != null ? Lists.newArrayList(new StreamsDatum[]{new StreamsDatum(mapper.readTree(this.item))}) : Lists.newArrayList();
        }
    }

    public TwitterStreamProcessor(TwitterStreamProvider twitterStreamProvider) {
        this(twitterStreamProvider, DEFAULT_POOL_SIZE);
    }

    public TwitterStreamProcessor(TwitterStreamProvider twitterStreamProvider, int i) {
        super((BlockingQueue) null);
        this.service = Executors.newFixedThreadPool(i);
        this.provider = twitterStreamProvider;
    }

    public boolean process() throws IOException, InterruptedException {
        String processNextMessage;
        do {
            processNextMessage = processNextMessage();
            if (processNextMessage == null) {
                Thread.sleep(10L);
            }
        } while (processNextMessage == null);
        return this.provider.addDatum(this.service.submit(new StreamDeserializer(processNextMessage)));
    }

    public void cleanUp() {
        ComponentUtils.shutdownExecutor(this.service, 1, 30);
    }
}
