package org.apache.nifi.processors.twitter;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Streams tweets from Twitter's streaming API v2. The stream provides a sample stream or a search stream based on previously uploaded rules. This processor also provides a pass through for certain fields of the tweet to be returned as part of the response. See https://developer.twitter.com/en/docs/twitter-api/data-dictionary/introduction for more information regarding the Tweet object model.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@SupportsBatching
@Tags({"twitter", "tweets", "social media", "status", "json"})
@WritesAttributes({@WritesAttribute(attribute = "mime.type", description = "The MIME Type set to application/json"), @WritesAttribute(attribute = "tweets", description = "The number of Tweets in the FlowFile")})
@PrimaryNodeOnly
/* loaded from: input_file:org/apache/nifi/processors/twitter/ConsumeTwitter.class */
public class ConsumeTwitter extends AbstractProcessor {
    static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue(StreamEndpoint.SAMPLE_ENDPOINT.getEndpointName(), "Sample Stream", "Streams about one percent of all Tweets. https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/api-reference/get-tweets-sample-stream");
    static final AllowableValue ENDPOINT_SEARCH = new AllowableValue(StreamEndpoint.SEARCH_ENDPOINT.getEndpointName(), "Search Stream", "The search stream produces Tweets that match filtering rules configured on Twitter services. At least one well-formed filtering rule must be configured. https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream");
    public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder().name("stream-endpoint").displayName("Stream Endpoint").description("The source from which the processor will consume Tweets.").required(true).allowableValues(new AllowableValue[]{ENDPOINT_SAMPLE, ENDPOINT_SEARCH}).defaultValue(ENDPOINT_SAMPLE.getValue()).build();
    public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder().name("base-path").displayName("Base Path").description("The base path that the processor will use for making HTTP requests. The default value should be sufficient for most use cases.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("https://api.twitter.com").build();
    public static final PropertyDescriptor BEARER_TOKEN = new PropertyDescriptor.Builder().name("bearer-token").displayName("Bearer Token").description("The Bearer Token provided by Twitter.").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor QUEUE_SIZE = new PropertyDescriptor.Builder().name("queue-size").displayName("Queue Size").description("Maximum size of internal queue for streamed messages").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("10000").build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("batch-size").displayName("Batch Size").description("The maximum size of the number of Tweets to be written to a single FlowFile. Will write fewer Tweets based on the number available in the queue at the time of processor invocation.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("1000").build();
    public static final PropertyDescriptor BACKOFF_ATTEMPTS = new PropertyDescriptor.Builder().name("backoff-attempts").displayName("Backoff Attempts").description("The number of reconnection tries the processor will attempt in the event of a disconnection of the stream for any reason, before throwing an exception. To start a stream after this exception occur and the connection is fixed, please stop and restart the processor. If the valueof this property is 0, then backoff will never occur and the processor will always need to be restartedif the stream fails.").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).defaultValue("5").build();
    public static final PropertyDescriptor BACKOFF_TIME = new PropertyDescriptor.Builder().name("backoff-time").displayName("Backoff Time").description("The duration to backoff before requesting a new stream ifthe current one fails for any reason. Will increase by factor of 2 every time a restart fails").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("1 mins").build();
    public static final PropertyDescriptor MAXIMUM_BACKOFF_TIME = new PropertyDescriptor.Builder().name("maximum-backoff-time").displayName("Maximum Backoff Time").description("The maximum duration to backoff to start attempting a new stream.It is recommended that this number be much higher than the 'Backoff Time' property").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("5 mins").build();
    public static final PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder().name("connect-timeout").displayName("Connect Timeout").description("The maximum time in which client should establish a connection with the Twitter API before a time out. Setting the value to 0 disables connection timeouts.").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("10 secs").build();
    public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder().name("read-timeout").displayName("Read Timeout").description("The maximum time of inactivity between receiving tweets from Twitter through the API before a timeout. Setting the value to 0 disables read timeouts.").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("10 secs").build();
    public static final PropertyDescriptor BACKFILL_MINUTES = new PropertyDescriptor.Builder().name("backfill-minutes").displayName("Backfill Minutes").description("The number of minutes (up to 5 minutes) of streaming data to be requested after a disconnect. Only available for project with academic research access. See https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/recovery-and-redundancy-features").required(true).defaultValue("0").addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor TWEET_FIELDS = new PropertyDescriptor.Builder().name("tweet-fields").displayName("Tweet Fields").description("A comma-separated list of tweet fields to be returned as part of the tweet. Refer to https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/tweet for proper usage. Possible field values include: attachments, author_id, context_annotations, conversation_id, created_at, entities, geo, id, in_reply_to_user_id, lang, non_public_metrics, organic_metrics, possibly_sensitive, promoted_metrics, public_metrics, referenced_tweets, reply_settings, source, text, withheld").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor USER_FIELDS = new PropertyDescriptor.Builder().name("user-fields").displayName("User Fields").description("A comma-separated list of user fields to be returned as part of the tweet. Refer to https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/user for proper usage. Possible field values include: created_at, description, entities, id, location, name, pinned_tweet_id, profile_image_url, protected, public_metrics, url, username, verified, withheld").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor MEDIA_FIELDS = new PropertyDescriptor.Builder().name("media-fields").displayName("Media Fields").description("A comma-separated list of media fields to be returned as part of the tweet. Refer to https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/media for proper usage. Possible field values include: alt_text, duration_ms, height, media_key, non_public_metrics, organic_metrics, preview_image_url, promoted_metrics, public_metrics, type, url, width").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor POLL_FIELDS = new PropertyDescriptor.Builder().name("poll-fields").displayName("Poll Fields").description("A comma-separated list of poll fields to be returned as part of the tweet. Refer to https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/poll for proper usage. Possible field values include: duration_minutes, end_datetime, id, options, voting_status").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor PLACE_FIELDS = new PropertyDescriptor.Builder().name("place-fields").displayName("Place Fields").description("A comma-separated list of place fields to be returned as part of the tweet. Refer to https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/place for proper usage. Possible field values include: contained_within, country, country_code, full_name, geo, id, name, place_type").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor EXPANSIONS = new PropertyDescriptor.Builder().name("expansions").displayName("Expansions").description("A comma-separated list of expansions for objects in the returned tweet. See https://developer.twitter.com/en/docs/twitter-api/expansions for proper usage. Possible field values include: author_id, referenced_tweets.id, referenced_tweets.id.author_id, entities.mentions.username, attachments.poll_ids, attachments.media_keys ,in_reply_to_user_id, geo.place_id").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles containing an array of one or more Tweets").build();
    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;
    private TweetStreamService tweetStreamService;
    private volatile BlockingQueue<String> messageQueue;
    private final AtomicBoolean streamStarted = new AtomicBoolean(false);

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(ENDPOINT);
        arrayList.add(BASE_PATH);
        arrayList.add(BEARER_TOKEN);
        arrayList.add(QUEUE_SIZE);
        arrayList.add(BATCH_SIZE);
        arrayList.add(BACKOFF_ATTEMPTS);
        arrayList.add(BACKOFF_TIME);
        arrayList.add(MAXIMUM_BACKOFF_TIME);
        arrayList.add(CONNECT_TIMEOUT);
        arrayList.add(READ_TIMEOUT);
        arrayList.add(BACKFILL_MINUTES);
        arrayList.add(TWEET_FIELDS);
        arrayList.add(USER_FIELDS);
        arrayList.add(MEDIA_FIELDS);
        arrayList.add(POLL_FIELDS);
        arrayList.add(PLACE_FIELDS);
        arrayList.add(EXPANSIONS);
        this.descriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        this.messageQueue = new LinkedBlockingQueue(processContext.getProperty(QUEUE_SIZE).asInteger().intValue());
        this.streamStarted.set(false);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        startTweetStreamService(processContext);
        String poll = this.messageQueue.poll();
        if (poll == null) {
            processContext.yield();
            return;
        }
        AtomicInteger atomicInteger = new AtomicInteger(1);
        FlowFile write = processSession.write(processSession.create(), outputStream -> {
            String poll2;
            int intValue = processContext.getProperty(BATCH_SIZE).asInteger().intValue();
            outputStream.write(91);
            outputStream.write(poll.getBytes(StandardCharsets.UTF_8));
            while (atomicInteger.get() < intValue && (poll2 = this.messageQueue.poll()) != null) {
                outputStream.write(44);
                outputStream.write(poll2.getBytes(StandardCharsets.UTF_8));
                atomicInteger.getAndIncrement();
            }
            outputStream.write(93);
        });
        HashMap hashMap = new HashMap();
        hashMap.put(CoreAttributes.MIME_TYPE.key(), "application/json");
        hashMap.put(CoreAttributes.FILENAME.key(), String.format("%s.json", UUID.randomUUID()));
        hashMap.put("tweets", Integer.toString(atomicInteger.get()));
        FlowFile putAllAttributes = processSession.putAllAttributes(write, hashMap);
        processSession.transfer(putAllAttributes, REL_SUCCESS);
        processSession.getProvenanceReporter().receive(putAllAttributes, this.tweetStreamService.getTransitUri(processContext.getProperty(ENDPOINT).getValue()));
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeStateChange(PrimaryNodeState primaryNodeState) {
        if (primaryNodeState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
            stopTweetStreamService();
        }
    }

    @OnStopped
    public void onStopped() {
        stopTweetStreamService();
        emptyQueue();
    }

    private void startTweetStreamService(ProcessContext processContext) {
        if (this.streamStarted.compareAndSet(false, true)) {
            this.tweetStreamService = new TweetStreamService(processContext, this.messageQueue, getLogger());
            this.tweetStreamService.start();
        }
    }

    private void stopTweetStreamService() {
        if (this.streamStarted.compareAndSet(true, false)) {
            if (this.tweetStreamService != null) {
                this.tweetStreamService.stop();
            }
            this.tweetStreamService = null;
            if (this.messageQueue.isEmpty()) {
                return;
            }
            getLogger().warn("Stopped consuming stream: unprocessed messages [{}]", new Object[]{Integer.valueOf(this.messageQueue.size())});
        }
    }

    private void emptyQueue() {
        while (!this.messageQueue.isEmpty()) {
            this.messageQueue.poll();
        }
    }
}
