/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.twitter;

import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.endpoint.DefaultStreamingEndpoint;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import java.io.FileInputStream;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwitterSource
extends RichSourceFunction<String> {
    private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
    private static final long serialVersionUID = 1L;
    private String authPath;
    protected transient BlockingQueue<String> queue;
    protected int queueSize = 10000;
    private transient BasicClient client;
    private int waitSec = 5;
    private int maxNumberOfTweets;
    private int currentNumberOfTweets;
    private volatile transient boolean isRunning;

    public TwitterSource(String authPath) {
        this.authPath = authPath;
        this.maxNumberOfTweets = -1;
    }

    public TwitterSource(String authPath, int numberOfTweets) {
        this.authPath = authPath;
        this.maxNumberOfTweets = numberOfTweets;
    }

    public void open(Configuration parameters) throws Exception {
        this.initializeConnection();
        this.currentNumberOfTweets = 0;
        this.isRunning = true;
    }

    protected void initializeConnection() {
        if (LOG.isInfoEnabled()) {
            LOG.info("Initializing Twitter Streaming API connection");
        }
        this.queue = new LinkedBlockingQueue<String>(this.queueSize);
        StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
        endpoint.stallWarnings(false);
        OAuth1 auth = this.authenticate();
        this.initializeClient(endpoint, auth);
        if (LOG.isInfoEnabled()) {
            LOG.info("Twitter Streaming API connection established successfully");
        }
    }

    protected OAuth1 authenticate() {
        Properties authenticationProperties = this.loadAuthenticationProperties();
        return new OAuth1(authenticationProperties.getProperty("consumerKey"), authenticationProperties.getProperty("consumerSecret"), authenticationProperties.getProperty("token"), authenticationProperties.getProperty("secret"));
    }

    private Properties loadAuthenticationProperties() {
        Properties properties = new Properties();
        try (FileInputStream input = new FileInputStream(this.authPath);){
            properties.load(input);
        }
        catch (Exception e) {
            throw new RuntimeException("Cannot open .properties file: " + this.authPath, e);
        }
        return properties;
    }

    protected void initializeClient(DefaultStreamingEndpoint endpoint, Authentication auth) {
        this.client = new ClientBuilder().name("twitterSourceClient").hosts("https://stream.twitter.com").endpoint(endpoint).authentication(auth).processor(new StringDelimitedProcessor(this.queue)).build();
        this.client.connect();
    }

    public void close() {
        if (LOG.isInfoEnabled()) {
            LOG.info("Initiating connection close");
        }
        if (this.client != null) {
            this.client.stop();
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Connection closed successfully");
        }
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public void setQueueSize(int queueSize) {
        this.queueSize = queueSize;
    }

    public int getWaitSec() {
        return this.waitSec;
    }

    public void setWaitSec(int waitSec) {
        this.waitSec = waitSec;
    }

    public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
        while (this.isRunning) {
            if (this.client.isDone()) {
                if (!LOG.isErrorEnabled()) break;
                LOG.error("Client connection closed unexpectedly: {}", (Object)this.client.getExitEvent().getMessage());
                break;
            }
            ctx.collect((Object)this.queue.take());
            if (this.maxNumberOfTweets == -1 || this.currentNumberOfTweets < this.maxNumberOfTweets) continue;
            break;
        }
    }

    public void cancel() {
        this.isRunning = false;
    }
}

