package org.apache.streams.twitter.provider;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.BasicAuth;
import com.twitter.hbc.httpclient.auth.OAuth1;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterStreamConfiguration;
import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/twitter/provider/TwitterStreamProvider.class */
public class TwitterStreamProvider implements StreamsProvider, Serializable, DatumStatusCountable {
    public static final String STREAMS_ID = "TwitterStreamProvider";
    private static final Logger LOGGER;
    private static final int MAX_BATCH = 1000;
    private TwitterStreamConfiguration config;
    private volatile Queue<Future<List<StreamsDatum>>> providerQueue;
    private Authentication auth;
    protected StreamingEndpoint endpoint;
    private BasicClient client;
    private AtomicBoolean running;
    protected TwitterStreamHelper processor;
    private DatumStatusCounter countersCurrent;
    private DatumStatusCounter countersTotal;
    static final /* synthetic */ boolean $assertionsDisabled;

    public static void main(String[] strArr) {
        Preconditions.checkArgument(strArr.length >= 2);
        String str = strArr[0];
        String str2 = strArr[1];
        Config load = ConfigFactory.load();
        File file = new File(str);
        if (!$assertionsDisabled && !file.exists()) {
            throw new AssertionError();
        }
        Config resolve = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)).withFallback(load).resolve();
        StreamsConfiguration detectConfiguration = StreamsConfigurator.detectConfiguration(resolve);
        TwitterStreamConfiguration twitterStreamConfiguration = (TwitterStreamConfiguration) new ComponentConfigurator(TwitterStreamConfiguration.class).detectConfiguration(resolve, "twitter");
        TwitterStreamProvider twitterStreamProvider = new TwitterStreamProvider(twitterStreamConfiguration);
        StreamsJacksonMapper streamsJacksonMapper = StreamsJacksonMapper.getInstance(Collections.singletonList("EEE MMM dd HH:mm:ss Z yyyy"));
        try {
            PrintStream printStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(str2)));
            twitterStreamProvider.prepare(twitterStreamConfiguration);
            twitterStreamProvider.startStream();
            do {
                Uninterruptibles.sleepUninterruptibly(detectConfiguration.getBatchFrequencyMs().longValue(), TimeUnit.MILLISECONDS);
                Iterator it = twitterStreamProvider.readCurrent().iterator();
                while (it.hasNext()) {
                    try {
                        printStream.println(streamsJacksonMapper.writeValueAsString(((StreamsDatum) it.next()).getDocument()));
                    } catch (JsonProcessingException e) {
                        System.err.println(e.getMessage());
                    }
                }
            } while (twitterStreamProvider.isRunning());
            twitterStreamProvider.cleanUp();
            printStream.flush();
        } catch (FileNotFoundException e2) {
            LOGGER.error("FileNotFoundException", e2);
        }
    }

    public TwitterStreamConfiguration getConfig() {
        return this.config;
    }

    public void setConfig(TwitterStreamConfiguration twitterStreamConfiguration) {
        this.config = twitterStreamConfiguration;
    }

    public TwitterStreamProvider() {
        this.running = new AtomicBoolean(false);
        this.processor = new TwitterStreamHelper(this);
        this.countersCurrent = new DatumStatusCounter();
        this.countersTotal = new DatumStatusCounter();
        this.config = (TwitterStreamConfiguration) new ComponentConfigurator(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter");
    }

    public TwitterStreamProvider(TwitterStreamConfiguration twitterStreamConfiguration) {
        this.running = new AtomicBoolean(false);
        this.processor = new TwitterStreamHelper(this);
        this.countersCurrent = new DatumStatusCounter();
        this.countersTotal = new DatumStatusCounter();
        this.config = twitterStreamConfiguration;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void startStream() {
        this.client.connect();
        this.running.set(true);
    }

    public synchronized StreamsResultSet readCurrent() {
        StreamsResultSet streamsResultSet;
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        drainTo(linkedBlockingDeque);
        streamsResultSet = new StreamsResultSet(linkedBlockingDeque);
        streamsResultSet.setCounter(new DatumStatusCounter());
        streamsResultSet.getCounter().add(this.countersCurrent);
        this.countersTotal.add(this.countersCurrent);
        this.countersCurrent = new DatumStatusCounter();
        return streamsResultSet;
    }

    public StreamsResultSet readNew(BigInteger bigInteger) {
        throw new NotImplementedException();
    }

    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
        throw new NotImplementedException();
    }

    public boolean isRunning() {
        return this.running.get() && !this.client.isDone();
    }

    public void prepare(Object obj) {
        HttpHosts httpHosts;
        Objects.requireNonNull(this.config.getEndpoint());
        if (this.config.getEndpoint().equals("userstream")) {
            httpHosts = new HttpHosts("https://userstream.twitter.com");
            UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint();
            userstreamEndpoint.withFollowings(true);
            userstreamEndpoint.withUser(false);
            userstreamEndpoint.allReplies(false);
            this.endpoint = userstreamEndpoint;
        } else if (this.config.getEndpoint().equals("sample")) {
            httpHosts = new HttpHosts("https://stream.twitter.com");
            boolean z = (this.config.getTrack() == null || this.config.getTrack().isEmpty()) ? false : true;
            boolean z2 = (this.config.getFollow() == null || this.config.getFollow().isEmpty()) ? false : true;
            if (z || z2) {
                LOGGER.debug("***\tPRESENT\t***");
                StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
                if (z) {
                    statusesFilterEndpoint.trackTerms(this.config.getTrack());
                }
                if (z2) {
                    statusesFilterEndpoint.followings(this.config.getFollow());
                }
                this.endpoint = statusesFilterEndpoint;
            } else {
                this.endpoint = new StatusesSampleEndpoint();
            }
        } else if (!this.config.getEndpoint().endsWith("firehose")) {
            LOGGER.error("NO ENDPOINT RESOLVED");
            return;
        } else {
            httpHosts = new HttpHosts("https://stream.twitter.com");
            this.endpoint = new StatusesFirehoseEndpoint();
        }
        if (this.config.getBasicauth() != null) {
            Objects.requireNonNull(this.config.getBasicauth().getUsername());
            Objects.requireNonNull(this.config.getBasicauth().getPassword());
            this.auth = new BasicAuth(this.config.getBasicauth().getUsername(), this.config.getBasicauth().getPassword());
        } else {
            if (this.config.getOauth() == null) {
                LOGGER.error("NO AUTH RESOLVED");
                return;
            }
            Objects.requireNonNull(this.config.getOauth().getConsumerKey());
            Objects.requireNonNull(this.config.getOauth().getConsumerSecret());
            Objects.requireNonNull(this.config.getOauth().getAccessToken());
            Objects.requireNonNull(this.config.getOauth().getAccessTokenSecret());
            this.auth = new OAuth1(this.config.getOauth().getConsumerKey(), this.config.getOauth().getConsumerSecret(), this.config.getOauth().getAccessToken(), this.config.getOauth().getAccessTokenSecret());
        }
        LOGGER.debug("host={}\tendpoint={}\taut={}", new Object[]{httpHosts, this.endpoint, this.auth});
        this.providerQueue = new LinkedBlockingQueue(1000);
        this.client = new ClientBuilder().name("apache/streams/streams-contrib/streams-provider-twitter").hosts(httpHosts).endpoint(this.endpoint).authentication(this.auth).connectionTimeout(1200000).processor(this.processor).build();
    }

    public void cleanUp() {
        this.client.stop();
        this.processor.cleanUp();
        this.running.set(false);
    }

    public DatumStatusCounter getDatumStatusCounter() {
        return this.countersTotal;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean addDatum(Future<List<StreamsDatum>> future) {
        try {
            ComponentUtils.offerUntilSuccess(future, this.providerQueue);
            this.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
            return true;
        } catch (Exception e) {
            this.countersCurrent.incrementStatus(DatumStatus.FAIL);
            LOGGER.warn("Unable to enqueue item from Twitter stream");
            return false;
        }
    }

    protected void drainTo(Queue<StreamsDatum> queue) {
        int i = 0;
        while (!this.providerQueue.isEmpty() && i <= 1000) {
            Iterator<StreamsDatum> it = pollForDatum().iterator();
            while (it.hasNext()) {
                ComponentUtils.offerUntilSuccess(it.next(), queue);
                i++;
            }
        }
    }

    protected List<StreamsDatum> pollForDatum() {
        try {
            return this.providerQueue.poll().get();
        } catch (InterruptedException e) {
            LOGGER.warn("Interrupted while waiting for future.  Initiate shutdown.");
            cleanUp();
            Thread.currentThread().interrupt();
            return new ArrayList();
        } catch (ExecutionException e2) {
            LOGGER.warn("Error getting tweet from future");
            return new ArrayList();
        }
    }

    static {
        $assertionsDisabled = !TwitterStreamProvider.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
    }
}
