package org.apache.streams.rss.provider;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.rss.FeedDetails;
import org.apache.streams.rss.RssStreamConfiguration;
import org.apache.streams.rss.provider.perpetual.RssFeedScheduler;
import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/rss/provider/RssStreamProvider.class */
public class RssStreamProvider implements StreamsProvider {
    public static final String STREAMS_ID = "RssStreamProvider";
    private static final Logger LOGGER;
    private static final int MAX_SIZE = 1000;
    private RssStreamConfiguration config;
    private boolean perpetual;
    private ExecutorService executor;
    private BlockingQueue<StreamsDatum> dataQueue;
    private AtomicBoolean isComplete;

    @VisibleForTesting
    protected RssFeedScheduler scheduler;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RssStreamProvider() {
        this((RssStreamConfiguration) new ComponentConfigurator(RssStreamConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), false);
    }

    public RssStreamProvider(boolean z) {
        this((RssStreamConfiguration) new ComponentConfigurator(RssStreamConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), z);
    }

    public RssStreamProvider(RssStreamConfiguration rssStreamConfiguration) {
        this(rssStreamConfiguration, false);
    }

    public RssStreamProvider(RssStreamConfiguration rssStreamConfiguration, boolean z) {
        this.perpetual = z;
        this.config = rssStreamConfiguration;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void setConfig(RssStreamConfiguration rssStreamConfiguration) {
        this.config = rssStreamConfiguration;
    }

    public void setRssFeeds(Set<String> set) {
    }

    public void setRssFeeds(Map<String, Long> map) {
        if (this.config == null) {
            this.config = new RssStreamConfiguration();
        }
        ArrayList arrayList = new ArrayList();
        for (String str : map.keySet()) {
            Long l = map.get(str);
            FeedDetails feedDetails = new FeedDetails();
            feedDetails.setUrl(str);
            feedDetails.setPollIntervalMillis(l);
            arrayList.add(feedDetails);
        }
        this.config.setFeeds(arrayList);
    }

    public void startStream() {
        LOGGER.trace("Starting Rss Scheduler");
        this.executor.submit(this.scheduler);
    }

    public StreamsResultSet readCurrent() {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        int i = 0;
        while (!this.dataQueue.isEmpty() && i < MAX_SIZE) {
            StreamsDatum streamsDatum = (StreamsDatum) ComponentUtils.pollWhileNotEmpty(this.dataQueue);
            if (streamsDatum != null) {
                i++;
                concurrentLinkedQueue.add(streamsDatum);
            }
        }
        this.isComplete.set(this.scheduler.isComplete() && concurrentLinkedQueue.isEmpty() && this.dataQueue.isEmpty());
        return new StreamsResultSet(concurrentLinkedQueue);
    }

    public StreamsResultSet readNew(BigInteger bigInteger) {
        return null;
    }

    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
        return null;
    }

    public boolean isRunning() {
        return !this.isComplete.get();
    }

    public void prepare(Object obj) {
        this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        this.dataQueue = new LinkedBlockingQueue();
        this.scheduler = getScheduler(this.dataQueue);
        this.isComplete = new AtomicBoolean(false);
    }

    @VisibleForTesting
    protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> blockingQueue) {
        return this.perpetual ? new RssFeedScheduler(this.executor, this.config.getFeeds(), blockingQueue) : new RssFeedScheduler(this.executor, this.config.getFeeds(), blockingQueue, 0);
    }

    public void cleanUp() {
        this.scheduler.stop();
        ComponentUtils.shutdownExecutor(this.executor, 10, 10);
    }

    public static void main(String[] strArr) throws Exception {
        Preconditions.checkArgument(strArr.length >= 2);
        String str = strArr[0];
        String str2 = strArr[1];
        Config load = ConfigFactory.load();
        File file = new File(str);
        if (!$assertionsDisabled && !file.exists()) {
            throw new AssertionError();
        }
        Config resolve = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)).withFallback(load).resolve();
        StreamsConfiguration detectConfiguration = StreamsConfigurator.detectConfiguration(resolve);
        RssStreamConfiguration rssStreamConfiguration = (RssStreamConfiguration) new ComponentConfigurator(RssStreamConfiguration.class).detectConfiguration(resolve, "rss");
        RssStreamProvider rssStreamProvider = new RssStreamProvider(rssStreamConfiguration);
        StreamsJacksonMapper streamsJacksonMapper = StreamsJacksonMapper.getInstance();
        PrintStream printStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(str2)));
        rssStreamProvider.prepare(rssStreamConfiguration);
        rssStreamProvider.startStream();
        do {
            Uninterruptibles.sleepUninterruptibly(detectConfiguration.getBatchFrequencyMs().longValue(), TimeUnit.MILLISECONDS);
            Iterator it = rssStreamProvider.readCurrent().iterator();
            while (it.hasNext()) {
                try {
                    printStream.println(streamsJacksonMapper.writeValueAsString(((StreamsDatum) it.next()).getDocument()));
                } catch (JsonProcessingException e) {
                    System.err.println(e.getMessage());
                }
            }
        } while (rssStreamProvider.isRunning());
        rssStreamProvider.cleanUp();
        printStream.flush();
    }

    static {
        $assertionsDisabled = !RssStreamProvider.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(RssStreamProvider.class);
    }
}
