package org.apache.streams.rss.provider;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.sun.syndication.feed.synd.SyndEntry;
import com.sun.syndication.io.FeedException;
import com.sun.syndication.io.SyndFeedInput;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.data.util.RFC3339Utils;
import org.apache.streams.rss.serializer.SyndEntrySerializer;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/rss/provider/RssStreamProviderTask.class */
public class RssStreamProviderTask implements Runnable {
    private static final int DEFAULT_TIME_OUT = 10000;
    private static final String RSS_KEY = "rssFeed";
    private static final String URI_KEY = "uri";
    private static final String LINK_KEY = "link";
    private static final String DATE_KEY = "publishedDate";
    private BlockingQueue<StreamsDatum> dataQueue;
    private String rssFeed;
    private int timeOut;
    private SyndEntrySerializer serializer;
    private DateTime publishedSince;
    private boolean perpetual;
    private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderTask.class);

    @VisibleForTesting
    protected static final Map<String, Set<String>> PREVIOUSLY_SEEN = new ConcurrentHashMap();

    public RssStreamProviderTask(BlockingQueue<StreamsDatum> blockingQueue, String str) {
        this(blockingQueue, str, new DateTime().minusYears(30), DEFAULT_TIME_OUT, false);
    }

    public RssStreamProviderTask(BlockingQueue<StreamsDatum> blockingQueue, String str, int i) {
        this(blockingQueue, str, new DateTime().minusYears(30), i, false);
    }

    public RssStreamProviderTask(BlockingQueue<StreamsDatum> blockingQueue, String str, DateTime dateTime) {
        this(blockingQueue, str, dateTime, DEFAULT_TIME_OUT, false);
    }

    public RssStreamProviderTask(BlockingQueue<StreamsDatum> blockingQueue, String str, DateTime dateTime, int i, boolean z) {
        this.dataQueue = blockingQueue;
        this.rssFeed = str;
        this.timeOut = i;
        this.publishedSince = dateTime;
        this.serializer = new SyndEntrySerializer();
        this.perpetual = z;
    }

    public String getRssFeed() {
        return this.rssFeed;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            Set<String> queueFeedEntries = queueFeedEntries(new URL(this.rssFeed));
            if (this.perpetual) {
                PREVIOUSLY_SEEN.put(getRssFeed(), queueFeedEntries);
            }
        } catch (IOException | FeedException e) {
            LOGGER.warn("Exception while reading rss stream, {} : {}", this.rssFeed, e);
        }
    }

    @VisibleForTesting
    protected Set<String> queueFeedEntries(URL url) throws IOException, FeedException {
        Set<String> newConcurrentHashSet = Sets.newConcurrentHashSet();
        URLConnection openConnection = url.openConnection();
        openConnection.setConnectTimeout(this.timeOut);
        openConnection.setConnectTimeout(this.timeOut);
        Iterator it = new SyndFeedInput().build(new InputStreamReader(openConnection.getInputStream())).getEntries().iterator();
        while (it.hasNext()) {
            ObjectNode deserialize = this.serializer.deserialize((SyndEntry) it.next());
            deserialize.put(RSS_KEY, this.rssFeed);
            String determineId = determineId(deserialize);
            newConcurrentHashSet.add(determineId);
            StreamsDatum streamsDatum = new StreamsDatum(deserialize);
            try {
                JsonNode jsonNode = deserialize.get(DATE_KEY);
                if (jsonNode != null) {
                    try {
                        if (RFC3339Utils.parseToUTC(jsonNode.asText()).isAfter(this.publishedSince) && (!this.perpetual || !seenBefore(determineId, this.rssFeed))) {
                            this.dataQueue.put(streamsDatum);
                            LOGGER.debug("Added entry, {}, to provider queue.", determineId);
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (Exception e2) {
                        LOGGER.trace("Failed to parse date from object node, attempting to add node to queue by default.");
                        if (!this.perpetual || !seenBefore(determineId, this.rssFeed)) {
                            this.dataQueue.put(streamsDatum);
                            LOGGER.debug("Added entry, {}, to provider queue.", determineId);
                        }
                    }
                } else {
                    LOGGER.debug("No published date present, attempting to add node to queue by default.");
                    if (!this.perpetual || !seenBefore(determineId, this.rssFeed)) {
                        this.dataQueue.put(streamsDatum);
                        LOGGER.debug("Added entry, {}, to provider queue.", determineId);
                    }
                }
            } catch (InterruptedException e3) {
                LOGGER.error("Interupted Exception.");
                Thread.currentThread().interrupt();
            }
        }
        return newConcurrentHashSet;
    }

    private String determineId(ObjectNode objectNode) {
        String str = null;
        if (objectNode.get(URI_KEY) != null && !objectNode.get(URI_KEY).textValue().equals("")) {
            str = objectNode.get(URI_KEY).textValue();
        } else if (objectNode.get(LINK_KEY) != null && !objectNode.get(LINK_KEY).textValue().equals("")) {
            str = objectNode.get(LINK_KEY).textValue();
        }
        return str;
    }

    private boolean seenBefore(String str, String str2) {
        Set<String> set = PREVIOUSLY_SEEN.get(str2);
        if (set == null) {
            return false;
        }
        return set.contains(str);
    }
}
