package org.apache.flink.streaming.connectors.twitter;

import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import java.io.FileInputStream;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/twitter/TwitterSource.class */
public class TwitterSource extends RichSourceFunction<String> {
    private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
    private static final long serialVersionUID = 1;
    private String authPath;
    private transient BlockingQueue<String> queue;
    private transient BasicClient client;
    private int numberOfTweets;
    private int queueSize = 10000;
    private int waitSec = 5;
    private boolean streaming = true;

    public TwitterSource(String str) {
        this.authPath = str;
    }

    public TwitterSource(String str, int i) {
        this.authPath = str;
        this.numberOfTweets = i;
    }

    public void open(Configuration configuration) throws Exception {
        initializeConnection();
    }

    public void invoke(Collector<String> collector) throws Exception {
        if (this.streaming) {
            collectMessages(collector);
        } else {
            collectFiniteMessages(collector);
        }
    }

    public void close() throws Exception {
        closeConnection();
    }

    private void initializeConnection() {
        if (LOG.isInfoEnabled()) {
            LOG.info("Initializing Twitter Streaming API connection");
        }
        this.queue = new LinkedBlockingQueue(this.queueSize);
        StatusesSampleEndpoint statusesSampleEndpoint = new StatusesSampleEndpoint();
        statusesSampleEndpoint.stallWarnings(false);
        initializeClient(statusesSampleEndpoint, authenticate());
        if (LOG.isInfoEnabled()) {
            LOG.info("Twitter Streaming API connection established successfully");
        }
    }

    private OAuth1 authenticate() {
        Properties loadAuthenticationProperties = loadAuthenticationProperties();
        return new OAuth1(loadAuthenticationProperties.getProperty("consumerKey"), loadAuthenticationProperties.getProperty("consumerSecret"), loadAuthenticationProperties.getProperty("token"), loadAuthenticationProperties.getProperty("secret"));
    }

    private Properties loadAuthenticationProperties() {
        Properties properties = new Properties();
        try {
            FileInputStream fileInputStream = new FileInputStream(this.authPath);
            properties.load(fileInputStream);
            fileInputStream.close();
            return properties;
        } catch (Exception e) {
            throw new RuntimeException("Cannot open .properties file: " + this.authPath, e);
        }
    }

    private void initializeClient(StatusesSampleEndpoint statusesSampleEndpoint, Authentication authentication) {
        this.client = new ClientBuilder().name("twitterSourceClient").hosts("https://stream.twitter.com").endpoint(statusesSampleEndpoint).authentication(authentication).processor(new StringDelimitedProcessor(this.queue)).build();
        this.client.connect();
    }

    protected void collectFiniteMessages(Collector<String> collector) {
        if (LOG.isInfoEnabled()) {
            LOG.info("Collecting tweets");
        }
        for (int i = 0; i < this.numberOfTweets; i++) {
            collectOneMessage(collector);
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Collecting tweets finished");
        }
    }

    protected void collectMessages(Collector<String> collector) {
        if (LOG.isInfoEnabled()) {
            LOG.info("Tweet-stream begins");
        }
        while (true) {
            collectOneMessage(collector);
        }
    }

    protected void collectOneMessage(Collector<String> collector) {
        if (this.client.isDone() && LOG.isErrorEnabled()) {
            LOG.error("Client connection closed unexpectedly: {}", this.client.getExitEvent().getMessage());
        }
        try {
            String poll = this.queue.poll(this.waitSec, TimeUnit.SECONDS);
            if (poll != null) {
                collector.collect(poll);
            } else if (LOG.isInfoEnabled()) {
                LOG.info("Did not receive a message in {} seconds", Integer.valueOf(this.waitSec));
            }
        } catch (InterruptedException e) {
            throw new RuntimeException("'Waiting for tweet' thread is interrupted", e);
        }
    }

    private void closeConnection() {
        if (LOG.isInfoEnabled()) {
            LOG.info("Initiating connection close");
        }
        this.client.stop();
        if (LOG.isInfoEnabled()) {
            LOG.info("Connection closed successfully");
        }
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public void setQueueSize(int i) {
        this.queueSize = i;
    }

    public int getWaitSec() {
        return this.waitSec;
    }

    public void setWaitSec(int i) {
        this.waitSec = i;
    }
}
