package org.apache.streams.rss.provider;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.google.common.collect.Queues;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.rss.RssStreamConfiguration;
import org.apache.streams.rss.provider.perpetual.RssFeedScheduler;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/rss/provider/RssStreamProviderTest.class */
public class RssStreamProviderTest extends RandomizedTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderTest.class);

    /* loaded from: input_file:org/apache/streams/rss/provider/RssStreamProviderTest$MockScheduler.class */
    private class MockScheduler extends RssFeedScheduler {
        private BlockingQueue<StreamsDatum> queue;
        private CountDownLatch latch;
        private volatile boolean complete;

        public MockScheduler(CountDownLatch countDownLatch, BlockingQueue<StreamsDatum> blockingQueue) {
            super((ExecutorService) null, (List) null, blockingQueue);
            this.complete = false;
            this.latch = countDownLatch;
            this.queue = blockingQueue;
        }

        public void run() {
            for (int i = 0; i < 20; i++) {
                try {
                    this.queue.put(new StreamsDatum((Object) null));
                    Thread.sleep(RandomizedTest.randomIntBetween(0, 5000));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                } finally {
                    this.complete = true;
                    this.latch.countDown();
                }
            }
        }

        public boolean isComplete() {
            return this.complete;
        }
    }

    @Test
    public void testRssFeedShutdownsNonPerpetual() throws Exception {
        RssStreamProvider rssStreamProvider = null;
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            LinkedBlockingQueue newLinkedBlockingQueue = Queues.newLinkedBlockingQueue();
            rssStreamProvider = new RssStreamProvider(new RssStreamConfiguration()) { // from class: org.apache.streams.rss.provider.RssStreamProviderTest.1
                protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> blockingQueue) {
                    return new MockScheduler(countDownLatch, blockingQueue);
                }
            };
            rssStreamProvider.prepare((Object) null);
            int i = 0;
            rssStreamProvider.startStream();
            while (!rssStreamProvider.scheduler.isComplete()) {
                StreamsResultSet readCurrent = rssStreamProvider.readCurrent();
                LOGGER.debug("Batch size : {}", Integer.valueOf(readCurrent.size()));
                i += readCurrent.size();
                Thread.sleep(randomIntBetween(0, 3000));
            }
            countDownLatch.await();
            StreamsResultSet readCurrent2 = rssStreamProvider.readCurrent();
            LOGGER.debug("Batch size : {}", Integer.valueOf(readCurrent2.size()));
            int size = i + readCurrent2.size();
            if (readCurrent2.size() != 0) {
                assertEquals(0L, rssStreamProvider.readCurrent().size());
            }
            assertTrue(rssStreamProvider.scheduler.isComplete());
            assertEquals(20L, size);
            assertFalse(rssStreamProvider.isRunning());
            assertEquals(0L, newLinkedBlockingQueue.size());
            rssStreamProvider.cleanUp();
            if (rssStreamProvider != null) {
                rssStreamProvider.cleanUp();
            }
        } catch (Throwable th) {
            if (rssStreamProvider != null) {
                rssStreamProvider.cleanUp();
            }
            throw th;
        }
    }
}
