package org.apache.streams.twitter.processor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.exceptions.ActivitySerializerException;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.TwitterStreamConfiguration;
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.provider.TwitterConfigurator;
import org.apache.streams.twitter.provider.TwitterEventClassifier;
import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.TwitterObjectFactory;
import twitter4j.conf.ConfigurationBuilder;

/* loaded from: input_file:org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.class */
public class FetchAndReplaceTwitterProcessor implements StreamsProcessor {
    private static final String PROVIDER_ID = TwitterActivityUtil.getProvider().getId();
    private static final Logger LOGGER = LoggerFactory.getLogger(FetchAndReplaceTwitterProcessor.class);
    private static final int MAX_ATTEMPTS = 5;
    public static final int BACKOFF = 240000;
    private final TwitterStreamConfiguration config;
    private Twitter client;
    private ObjectMapper mapper;
    private int retryCount;

    public FetchAndReplaceTwitterProcessor() {
        this(TwitterConfigurator.detectTwitterStreamConfiguration(StreamsConfigurator.config.getConfig("twitter")));
    }

    public FetchAndReplaceTwitterProcessor(TwitterStreamConfiguration twitterStreamConfiguration) {
        this.config = twitterStreamConfiguration;
    }

    public List<StreamsDatum> process(StreamsDatum streamsDatum) {
        if (!(streamsDatum.getDocument() instanceof Activity)) {
            throw new IllegalStateException("Requires an activity document");
        }
        Activity activity = (Activity) streamsDatum.getDocument();
        String id = activity.getId();
        if (PROVIDER_ID.equals(activity.getProvider().getId())) {
            fetchAndReplace(activity, id);
        }
        return Lists.newArrayList(new StreamsDatum[]{streamsDatum});
    }

    public void prepare(Object obj) {
        this.client = getTwitterClient();
        this.mapper = StreamsTwitterMapper.getInstance();
    }

    public void cleanUp() {
    }

    protected void fetchAndReplace(Activity activity, String str) {
        try {
            replace(activity, fetch(activity));
            activity.setId(str);
            this.retryCount = 0;
        } catch (Exception e) {
            LOGGER.warn("Error fetching and replacing tweet for activity {}", activity.getId());
        } catch (TwitterException e2) {
            if (e2.exceededRateLimitation()) {
                sleepAndTryAgain(activity, str);
            }
        }
    }

    protected void replace(Activity activity, String str) throws IOException, ActivitySerializerException {
        Class detectClass = TwitterEventClassifier.detectClass(str);
        Object readValue = this.mapper.readValue(str, detectClass);
        if (detectClass.equals(Retweet.class) || detectClass.equals(Tweet.class)) {
            TwitterActivityUtil.updateActivity((Tweet) readValue, activity);
        } else if (detectClass.equals(Delete.class)) {
            TwitterActivityUtil.updateActivity((Delete) readValue, activity);
        } else {
            LOGGER.info("Could not determine the correct update method for {}", detectClass);
        }
    }

    protected String fetch(Activity activity) throws TwitterException {
        String id = activity.getObject().getId();
        LOGGER.debug("Fetching status from Twitter for {}", id);
        return TwitterObjectFactory.getRawJSON(getTwitterClient().showStatus(Long.valueOf(id.replace("id:twitter:tweets:", "")).longValue()));
    }

    protected Twitter getTwitterClient() {
        if (this.client == null) {
            this.client = new TwitterFactory(new ConfigurationBuilder().setOAuthConsumerKey(this.config.getOauth().getConsumerKey()).setOAuthConsumerSecret(this.config.getOauth().getConsumerSecret()).setOAuthAccessToken(this.config.getOauth().getAccessToken()).setOAuthAccessTokenSecret(this.config.getOauth().getAccessTokenSecret()).setIncludeEntitiesEnabled(true).setJSONStoreEnabled(true).setAsyncNumThreads(1).setRestBaseURL("https://api.twitter.com:443/1.1/").setIncludeMyRetweetEnabled(Boolean.TRUE.booleanValue()).setPrettyDebugEnabled(Boolean.TRUE.booleanValue()).build()).getInstance();
        }
        return this.client;
    }

    protected void sleepAndTryAgain(Activity activity, String str) {
        try {
            if (this.retryCount < MAX_ATTEMPTS) {
                this.retryCount++;
                LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter API", Integer.valueOf(this.retryCount * 4));
                Thread.sleep(BACKOFF * this.retryCount);
                fetchAndReplace(activity, str);
            } else {
                this.retryCount = 0;
            }
        } catch (InterruptedException e) {
            LOGGER.warn("Thread sleep interrupted while waiting for twitter backoff");
        }
    }
}
