package org.apache.streams.twitter.processor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.twitter.converter.StreamsTwitterMapper;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.pojo.User;
import org.apache.streams.twitter.provider.TwitterEventClassifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/twitter/processor/TwitterProfileProcessor.class */
public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
    private static final String STREAMS_ID = "TwitterProfileProcessor";
    private ObjectMapper mapper = new StreamsTwitterMapper();
    private Queue<StreamsDatum> inQueue;
    private Queue<StreamsDatum> outQueue;
    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
    public static final String TERMINATE = new String("TERMINATE");

    @Override // java.lang.Runnable
    public void run() {
        StreamsDatum poll;
        while (true) {
            try {
                poll = this.inQueue.poll();
            } catch (Exception e) {
                e.printStackTrace();
            }
            if ((poll.getDocument() instanceof String) && poll.equals(TERMINATE)) {
                LOGGER.info("Terminating!");
                return;
            }
            Thread.sleep(new Random().nextInt(100));
            Iterator<StreamsDatum> it = process(poll).iterator();
            while (it.hasNext()) {
                this.outQueue.offer(it.next());
            }
        }
    }

    public StreamsDatum createStreamsDatum(User user) {
        return new StreamsDatum(user, user.getIdStr());
    }

    public String getId() {
        return STREAMS_ID;
    }

    public List<StreamsDatum> process(StreamsDatum streamsDatum) {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            String writeValueAsString = streamsDatum.getDocument() instanceof String ? (String) streamsDatum.getDocument() : this.mapper.writeValueAsString((ObjectNode) streamsDatum.getDocument());
            Class detectClass = TwitterEventClassifier.detectClass(writeValueAsString);
            if (detectClass.equals(Tweet.class)) {
                LOGGER.debug("TWEET");
                newArrayList.add(createStreamsDatum(((Tweet) this.mapper.readValue(writeValueAsString, Tweet.class)).getUser()));
            } else if (detectClass.equals(Retweet.class)) {
                LOGGER.debug("RETWEET");
                newArrayList.add(createStreamsDatum(((Retweet) this.mapper.readValue(writeValueAsString, Retweet.class)).getRetweetedStatus().getUser()));
            } else {
                if (!detectClass.equals(User.class)) {
                    return Lists.newArrayList();
                }
                LOGGER.debug("USER");
                newArrayList.add(createStreamsDatum((User) this.mapper.readValue(writeValueAsString, User.class)));
            }
            return newArrayList;
        } catch (Exception e) {
            e.printStackTrace();
            LOGGER.warn("Error processing " + streamsDatum.toString());
            return Lists.newArrayList();
        }
    }

    public void prepare(Object obj) {
    }

    public void cleanUp() {
    }
}
