package co.cask.cdap.packs.twitter;

import co.cask.cdap.api.annotation.Tick;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.FlowletContext;
import co.cask.cdap.api.flow.flowlet.OutputEmitter;
import co.cask.cdap.api.metrics.Metrics;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import com.google.common.io.LineProcessor;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.gson.Gson;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Status;
import twitter4j.StatusAdapter;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.Configuration;
import twitter4j.conf.ConfigurationBuilder;

/* loaded from: input_file:co/cask/cdap/packs/twitter/TweetCollectorFlowlet.class */
public class TweetCollectorFlowlet extends AbstractFlowlet {
    private static final Logger LOG = LoggerFactory.getLogger(TweetCollectorFlowlet.class);
    private static final Gson GSON = new Gson();
    public static final String ARG_TWITTER4J_DISABLED = "tweet.collector.source.twitter4j.disabled";
    public static final String ARG_SOURCE_FILE = "tweet.collector.source.file";
    public static final int DEFAULT_INTERNAL_QUEUE_SIZE = 10000;
    public static final int DEFAULT_EMIT_BATCH_MAX_SIZE = 100;
    public static final String ARG_TWITTER4J_OAUTH_CONSUMER_KEY = "twitter4j.oauth.consumerKey";
    public static final String ARG_TWITTER4J_OAUTH_CONSUMER_SECRET = "twitter4j.oauth.consumerSecret";
    public static final String ARG_TWITTER4J_OAUTH_ACCESS_TOKEN = "twitter4j.oauth.accessToken";
    public static final String ARG_TWITTER4J_OAUTH_ACCESS_TOKEN_SECRET = "twitter4j.oauth.accessTokenSecret";
    private Metrics metrics;
    private OutputEmitter<Tweet> output;
    private BlockingQueue<Tweet> queue;
    private FileReaderThread fileReader;
    private TweetPuller twitterStreamPuller;
    private int internalQueueSize;
    private int emitBatchMaxSize;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/packs/twitter/TweetCollectorFlowlet$FileReaderThread.class */
    public class FileReaderThread extends Thread {
        private final String srcFile;

        private FileReaderThread(String str) {
            super("FileReaderThread");
            this.srcFile = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                Files.readLines(new File(this.srcFile), Charsets.UTF_8, new LineProcessor<Object>() { // from class: co.cask.cdap.packs.twitter.TweetCollectorFlowlet.FileReaderThread.1
                    public boolean processLine(String str) throws IOException {
                        try {
                            TweetCollectorFlowlet.this.queue.put((Tweet) TweetCollectorFlowlet.GSON.fromJson(str, Tweet.class));
                            return true;
                        } catch (InterruptedException e) {
                            TweetCollectorFlowlet.LOG.warn("Interrupted while writing to a queue", e);
                            Thread.currentThread().interrupt();
                            return false;
                        }
                    }

                    public Object getResult() {
                        return null;
                    }
                });
            } catch (IOException e) {
                TweetCollectorFlowlet.LOG.error("Failed to read tweets from file " + this.srcFile, e);
            }
            TweetCollectorFlowlet.LOG.info("FileReaderThread run() is exiting");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/packs/twitter/TweetCollectorFlowlet$TweetPuller.class */
    public class TweetPuller extends AbstractExecutionThreadService {
        private final Configuration twitter4jConf;
        private TwitterStream twitterStream;
        private volatile boolean stopped;

        private TweetPuller(Configuration configuration) {
            this.stopped = false;
            this.twitter4jConf = configuration;
        }

        protected void triggerShutdown() {
            this.stopped = true;
            super.triggerShutdown();
        }

        protected void shutDown() throws Exception {
            this.twitterStream.cleanUp();
            this.twitterStream.shutdown();
            super.shutDown();
        }

        public void run() {
            this.twitterStream = new TwitterStreamFactory(this.twitter4jConf).getInstance();
            this.twitterStream.addListener(new StatusAdapter() { // from class: co.cask.cdap.packs.twitter.TweetCollectorFlowlet.TweetPuller.1
                public void onStatus(Status status) {
                    String text = status.getText();
                    while (!TweetPuller.this.stopped) {
                        try {
                            TweetCollectorFlowlet.this.queue.offer(new Tweet(text, status.getCreatedAt().getTime()), 1L, TimeUnit.SECONDS);
                        } catch (InterruptedException e) {
                            TweetCollectorFlowlet.LOG.warn("Interrupted while writing to a queue", e);
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                }

                public void onTrackLimitationNotice(int i) {
                    TweetCollectorFlowlet.LOG.error("Got track limitation notice:" + i);
                }

                public void onException(Exception exc) {
                    TweetCollectorFlowlet.LOG.warn("Error during reading from stream" + exc.getMessage());
                }
            });
            this.twitterStream.sample();
            TweetCollectorFlowlet.LOG.info("CollectingThread run() is exiting");
        }
    }

    public TweetCollectorFlowlet() {
        this(DEFAULT_INTERNAL_QUEUE_SIZE, 100);
    }

    public TweetCollectorFlowlet(String str) {
        this(str, DEFAULT_INTERNAL_QUEUE_SIZE, 100);
    }

    public TweetCollectorFlowlet(int i, int i2) {
        this.internalQueueSize = i;
        this.emitBatchMaxSize = i2;
    }

    public TweetCollectorFlowlet(String str, int i, int i2) {
        super(str);
        this.internalQueueSize = i;
        this.emitBatchMaxSize = i2;
    }

    public void initialize(FlowletContext flowletContext) throws Exception {
        super.initialize(flowletContext);
        Map<String, String> runtimeArguments = flowletContext.getRuntimeArguments();
        boolean z = false;
        if (runtimeArguments.containsKey(ARG_TWITTER4J_DISABLED) && Boolean.parseBoolean(runtimeArguments.get(ARG_TWITTER4J_DISABLED))) {
            LOG.warn("Pulling tweets is disabled via 'tweet.collector.source.twitter4j.disabled' runtime argument.");
            z = true;
        }
        String str = null;
        if (runtimeArguments.containsKey(ARG_SOURCE_FILE)) {
            str = runtimeArguments.get(ARG_SOURCE_FILE);
            LOG.info("Will read tweets from file: " + str);
        }
        if (!z || str != null) {
            this.queue = new LinkedBlockingQueue(this.internalQueueSize);
        }
        if (!z) {
            this.twitterStreamPuller = createTwitterStreamPuller(runtimeArguments);
            this.twitterStreamPuller.start();
        }
        if (str != null) {
            this.fileReader = new FileReaderThread(str);
            this.fileReader.start();
        }
    }

    private TweetPuller createTwitterStreamPuller(Map<String, String> map) {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        if (map.containsKey(ARG_TWITTER4J_OAUTH_CONSUMER_KEY)) {
            configurationBuilder.setOAuthConsumerKey(map.get(ARG_TWITTER4J_OAUTH_CONSUMER_KEY));
        }
        if (map.containsKey(ARG_TWITTER4J_OAUTH_CONSUMER_SECRET)) {
            configurationBuilder.setOAuthConsumerSecret(map.get(ARG_TWITTER4J_OAUTH_CONSUMER_SECRET));
        }
        if (map.containsKey(ARG_TWITTER4J_OAUTH_ACCESS_TOKEN)) {
            configurationBuilder.setOAuthAccessToken(map.get(ARG_TWITTER4J_OAUTH_ACCESS_TOKEN));
        }
        if (map.containsKey(ARG_TWITTER4J_OAUTH_ACCESS_TOKEN_SECRET)) {
            configurationBuilder.setOAuthAccessTokenSecret(map.get(ARG_TWITTER4J_OAUTH_ACCESS_TOKEN_SECRET));
        }
        return new TweetPuller(configurationBuilder.build());
    }

    public void destroy() {
        if (this.twitterStreamPuller != null) {
            this.twitterStreamPuller.stop();
        }
        if (this.fileReader != null) {
            this.fileReader.interrupt();
        }
    }

    @Tick(unit = TimeUnit.NANOSECONDS, delay = 0)
    public void collect() throws InterruptedException {
        Tweet poll;
        if (this.queue == null) {
            Thread.sleep(1000L);
            return;
        }
        for (int i = 0; i < this.emitBatchMaxSize && (poll = this.queue.poll()) != null; i++) {
            this.metrics.count("tweet.collector.out.total", 1);
            emit(poll);
        }
    }

    protected void emit(Tweet tweet) {
        this.output.emit(tweet);
    }
}
