package org.apache.pulsar.connect.twitter;

import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.common.DelimitedStreamReader;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.OAuth1;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.pulsar.connect.core.PushSource;
import org.apache.pulsar.connect.core.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/connect/twitter/TwitterFireHose.class */
public class TwitterFireHose implements PushSource<String> {
    private static final Logger LOG = LoggerFactory.getLogger(TwitterFireHose.class);
    private Object waitObject;
    private Function<Record<String>, CompletableFuture<Void>> consumeFunction;

    /* loaded from: input_file:org/apache/pulsar/connect/twitter/TwitterFireHose$EndpointInitializer.class */
    public interface EndpointInitializer {
        StreamingEndpoint createEndpoint();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/connect/twitter/TwitterFireHose$SampleStatusesEndpoint.class */
    public static class SampleStatusesEndpoint implements EndpointInitializer, Serializable {
        private SampleStatusesEndpoint() {
        }

        @Override // org.apache.pulsar.connect.twitter.TwitterFireHose.EndpointInitializer
        public StreamingEndpoint createEndpoint() {
            StatusesSampleEndpoint statusesSampleEndpoint = new StatusesSampleEndpoint();
            statusesSampleEndpoint.stallWarnings(false);
            statusesSampleEndpoint.delimited(false);
            return statusesSampleEndpoint;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/connect/twitter/TwitterFireHose$TwitterRecord.class */
    private static class TwitterRecord implements Record<String> {
        private String tweet;

        public TwitterRecord(String str) {
            this.tweet = str;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public String m1getValue() {
            return this.tweet;
        }
    }

    public void open(Map<String, Object> map) throws IOException {
        TwitterFireHoseConfig load = TwitterFireHoseConfig.load(map);
        if (load.getConsumerKey() == null || load.getConsumerSecret() == null || load.getToken() != null || load.getTokenSecret() == null) {
            throw new IllegalArgumentException("Required property not set.");
        }
        this.waitObject = new Object();
        startThread(load);
    }

    public void setConsumer(Function<Record<String>, CompletableFuture<Void>> function) {
        this.consumeFunction = function;
    }

    public void close() throws Exception {
        stopThread();
    }

    private void startThread(final TwitterFireHoseConfig twitterFireHoseConfig) {
        BasicClient build = new ClientBuilder().name(twitterFireHoseConfig.getClientName()).hosts(twitterFireHoseConfig.getClientHosts()).endpoint(new SampleStatusesEndpoint().createEndpoint()).authentication(new OAuth1(twitterFireHoseConfig.getConsumerKey(), twitterFireHoseConfig.getConsumerSecret(), twitterFireHoseConfig.getToken(), twitterFireHoseConfig.getTokenSecret())).processor(new HosebirdMessageProcessor() { // from class: org.apache.pulsar.connect.twitter.TwitterFireHose.1
            public DelimitedStreamReader reader;

            public void setup(InputStream inputStream) {
                this.reader = new DelimitedStreamReader(inputStream, Constants.DEFAULT_CHARSET, twitterFireHoseConfig.getClientBufferSize());
            }

            public boolean process() throws IOException, InterruptedException {
                try {
                    TwitterFireHose.this.consumeFunction.apply(new TwitterRecord(this.reader.readLine()));
                    return true;
                } catch (Exception e) {
                    TwitterFireHose.LOG.error("Exception thrown");
                    return true;
                }
            }
        }).build();
        Thread thread = new Thread(() -> {
            LOG.info("Started the Twitter FireHose Runner Thread");
            build.connect();
            LOG.info("Twitter Streaming API connection established successfully");
            try {
                synchronized (this.waitObject) {
                    this.waitObject.wait();
                }
            } catch (Exception e) {
                LOG.info("Got a exception in waitObject");
            }
            LOG.debug("Closing Twitter Streaming API connection");
            build.stop();
            LOG.info("Twitter Streaming API connection closed");
            LOG.info("Twitter FireHose Runner Thread ending");
        });
        thread.setName("TwitterFireHoseRunner");
        thread.start();
    }

    private void stopThread() {
        LOG.info("Source closed");
        synchronized (this.waitObject) {
            this.waitObject.notify();
        }
    }
}
