package org.apache.flink.streaming.connectors.twitter;

import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.common.DelimitedStreamReader;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.OAuth1;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/twitter/TwitterSource.class */
public class TwitterSource extends RichSourceFunction<String> {
    private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
    private static final long serialVersionUID = 1;
    public static final String CONSUMER_KEY = "twitter-source.consumerKey";
    public static final String CONSUMER_SECRET = "twitter-source.consumerSecret";
    public static final String TOKEN = "twitter-source.token";
    public static final String TOKEN_SECRET = "twitter-source.tokenSecret";
    public static final String CLIENT_NAME = "twitter-source.name";
    public static final String CLIENT_HOSTS = "twitter-source.hosts";
    public static final String CLIENT_BUFFER_SIZE = "twitter-source.bufferSize";
    private final Properties properties;
    private transient BasicClient client;
    private transient Object waitLock;
    private EndpointInitializer initializer = new SampleStatusesEndpoint();
    private transient boolean running = true;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/twitter/TwitterSource$EndpointInitializer.class */
    public interface EndpointInitializer {
        StreamingEndpoint createEndpoint();
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/twitter/TwitterSource$SampleStatusesEndpoint.class */
    private static class SampleStatusesEndpoint implements EndpointInitializer, Serializable {
        private SampleStatusesEndpoint() {
        }

        @Override // org.apache.flink.streaming.connectors.twitter.TwitterSource.EndpointInitializer
        public StreamingEndpoint createEndpoint() {
            StatusesSampleEndpoint statusesSampleEndpoint = new StatusesSampleEndpoint();
            statusesSampleEndpoint.stallWarnings(false);
            statusesSampleEndpoint.delimited(false);
            return statusesSampleEndpoint;
        }
    }

    public TwitterSource(Properties properties) {
        checkProperty(properties, CONSUMER_KEY);
        checkProperty(properties, CONSUMER_SECRET);
        checkProperty(properties, TOKEN);
        checkProperty(properties, TOKEN_SECRET);
        this.properties = properties;
    }

    private static void checkProperty(Properties properties, String str) {
        if (!properties.containsKey(str)) {
            throw new IllegalArgumentException("Required property '" + str + "' not set.");
        }
    }

    public void setCustomEndpointInitializer(EndpointInitializer endpointInitializer) {
        Objects.requireNonNull(endpointInitializer, "Initializer has to be set");
        ClosureCleaner.ensureSerializable(endpointInitializer);
        this.initializer = endpointInitializer;
    }

    public void open(Configuration configuration) throws Exception {
        this.waitLock = new Object();
    }

    public void run(final SourceFunction.SourceContext<String> sourceContext) throws Exception {
        LOG.info("Initializing Twitter Streaming API connection");
        StreamingEndpoint createEndpoint = this.initializer.createEndpoint();
        this.client = new ClientBuilder().name(this.properties.getProperty(CLIENT_NAME, "flink-twitter-source")).hosts(this.properties.getProperty(CLIENT_HOSTS, Constants.STREAM_HOST)).endpoint(createEndpoint).authentication(new OAuth1(this.properties.getProperty(CONSUMER_KEY), this.properties.getProperty(CONSUMER_SECRET), this.properties.getProperty(TOKEN), this.properties.getProperty(TOKEN_SECRET))).processor(new HosebirdMessageProcessor() { // from class: org.apache.flink.streaming.connectors.twitter.TwitterSource.1
            public DelimitedStreamReader reader;

            @Override // com.twitter.hbc.core.processor.HosebirdMessageProcessor
            public void setup(InputStream inputStream) {
                this.reader = new DelimitedStreamReader(inputStream, Constants.DEFAULT_CHARSET, Integer.parseInt(TwitterSource.this.properties.getProperty(TwitterSource.CLIENT_BUFFER_SIZE, "50000")));
            }

            @Override // com.twitter.hbc.core.processor.HosebirdMessageProcessor
            public boolean process() throws IOException, InterruptedException {
                sourceContext.collect(this.reader.readLine());
                return true;
            }
        }).build();
        this.client.connect();
        this.running = true;
        LOG.info("Twitter Streaming API connection established successfully");
        while (this.running) {
            synchronized (this.waitLock) {
                this.waitLock.wait(100L);
            }
        }
    }

    public void close() {
        this.running = false;
        LOG.info("Closing source");
        if (this.client != null) {
            this.client.stop();
        }
        synchronized (this.waitLock) {
            this.waitLock.notify();
        }
    }

    public void cancel() {
        LOG.info("Cancelling Twitter source");
        close();
    }
}
