package org.apache.streams.rss.provider;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import java.math.BigInteger;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.rss.FeedDetails;
import org.apache.streams.rss.RssStreamConfiguration;
import org.apache.streams.rss.provider.perpetual.RssFeedScheduler;
import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/rss/provider/RssStreamProvider.class */
public class RssStreamProvider implements StreamsProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProvider.class);
    private static final int MAX_SIZE = 1000;
    private RssStreamConfiguration config;
    private boolean perpetual;
    private Set<String> urlFeeds;
    private ExecutorService executor;
    private BlockingQueue<StreamsDatum> dataQueue;
    private AtomicBoolean isComplete;
    private int consecutiveEmptyReads;

    @VisibleForTesting
    protected RssFeedScheduler scheduler;

    public RssStreamProvider() {
        this(RssStreamConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("rss")), false);
    }

    public RssStreamProvider(boolean z) {
        this(RssStreamConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("rss")), z);
    }

    public RssStreamProvider(RssStreamConfiguration rssStreamConfiguration) {
        this(rssStreamConfiguration, false);
    }

    public RssStreamProvider(RssStreamConfiguration rssStreamConfiguration, boolean z) {
        this.perpetual = z;
        this.config = rssStreamConfiguration;
    }

    public void setConfig(RssStreamConfiguration rssStreamConfiguration) {
        this.config = rssStreamConfiguration;
    }

    public void setRssFeeds(Set<String> set) {
        this.urlFeeds = set;
    }

    public void setRssFeeds(Map<String, Long> map) {
        if (this.config == null) {
            this.config = new RssStreamConfiguration();
        }
        LinkedList newLinkedList = Lists.newLinkedList();
        for (String str : map.keySet()) {
            Long l = map.get(str);
            FeedDetails feedDetails = new FeedDetails();
            feedDetails.setUrl(str);
            feedDetails.setPollIntervalMillis(l);
            newLinkedList.add(feedDetails);
        }
        this.config.setFeeds(newLinkedList);
    }

    public void startStream() {
        LOGGER.trace("Starting Rss Scheduler");
        this.executor.submit(this.scheduler);
    }

    public StreamsResultSet readCurrent() {
        ConcurrentLinkedQueue newConcurrentLinkedQueue = Queues.newConcurrentLinkedQueue();
        int i = 0;
        while (!this.dataQueue.isEmpty() && i < MAX_SIZE) {
            StreamsDatum streamsDatum = (StreamsDatum) ComponentUtils.pollWhileNotEmpty(this.dataQueue);
            if (streamsDatum != null) {
                i++;
                newConcurrentLinkedQueue.add(streamsDatum);
            }
        }
        this.isComplete.set(this.scheduler.isComplete() && newConcurrentLinkedQueue.isEmpty() && this.dataQueue.isEmpty());
        return new StreamsResultSet(newConcurrentLinkedQueue);
    }

    public StreamsResultSet readNew(BigInteger bigInteger) {
        return null;
    }

    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
        return null;
    }

    public boolean isRunning() {
        return !this.isComplete.get();
    }

    public void prepare(Object obj) {
        this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        this.dataQueue = Queues.newLinkedBlockingQueue();
        this.scheduler = getScheduler(this.dataQueue);
        this.isComplete = new AtomicBoolean(false);
        this.consecutiveEmptyReads = 0;
    }

    @VisibleForTesting
    protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> blockingQueue) {
        return this.perpetual ? new RssFeedScheduler(this.executor, this.config.getFeeds(), blockingQueue) : new RssFeedScheduler(this.executor, this.config.getFeeds(), blockingQueue, 0);
    }

    public void cleanUp() {
        this.scheduler.stop();
        ComponentUtils.shutdownExecutor(this.executor, 10, 10);
    }
}
