/*
 * Decompiled with CFR 0.152.
 */
package de.jungblut.crawl;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnel;
import com.google.common.hash.Funnels;
import de.jungblut.crawl.Crawler;
import de.jungblut.crawl.FetchResult;
import de.jungblut.crawl.FetchResultPersister;
import de.jungblut.crawl.FetchThread;
import de.jungblut.crawl.ResultWriter;
import de.jungblut.crawl.SequenceFileResultWriter;
import de.jungblut.crawl.extraction.Extractor;
import de.jungblut.crawl.extraction.OutlinkExtractor;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public final class MultithreadedCrawler<T extends FetchResult>
implements Crawler<T> {
    private static final Logger LOG = LogManager.getLogger(MultithreadedCrawler.class);
    private static final int THREAD_POOL_SIZE = 32;
    private static final int BATCH_SIZE = 10;
    private Extractor<T> extractor;
    private FetchResultPersister<T> persister;
    private Thread persisterThread;
    private int fetches = 100000;
    private int poolSize = 32;
    private int batchSize = 10;

    public MultithreadedCrawler(int threadPoolSize, int batchSize, int fetches, Extractor<T> extractor, ResultWriter<T> writer) throws IOException {
        this.poolSize = threadPoolSize;
        this.batchSize = batchSize;
        this.setup(fetches, extractor, writer);
    }

    public MultithreadedCrawler(int fetches, Extractor<T> extractor, ResultWriter<T> writer) throws IOException {
        this.setup(fetches, extractor, writer);
    }

    @Override
    public final void setup(int fetches, Extractor<T> extractor, ResultWriter<T> writer) throws IOException {
        this.fetches = fetches;
        this.extractor = extractor;
        this.persister = new FetchResultPersister<T>(writer);
        this.persisterThread = new Thread(this.persister);
        this.persisterThread.start();
    }

    @Override
    public final void process(String ... seedUrls) throws InterruptedException, ExecutionException {
        LinkedList<String> linksToCrawl = new LinkedList<String>();
        BloomFilter visited = BloomFilter.create((Funnel)Funnels.stringFunnel((Charset)Charset.defaultCharset()), (int)this.fetches);
        ExecutorService threadPool = Executors.newFixedThreadPool(this.poolSize);
        ExecutorCompletionService<T> completionService = new ExecutorCompletionService<T>(threadPool);
        long appStart = System.currentTimeMillis();
        LOG.info("Num sites to fetch " + this.fetches);
        int currentRunningThreads = 0;
        linksToCrawl.addAll(Arrays.asList(seedUrls));
        for (String seed : seedUrls) {
            visited.put((Object)seed);
        }
        do {
            int length;
            int n = length = linksToCrawl.size() > this.batchSize ? this.batchSize : linksToCrawl.size();
            if (this.fetches > 0 && length > 0) {
                this.fetches -= length;
                ArrayList<String> linkList = new ArrayList<String>(length);
                for (int i = 0; i < length; ++i) {
                    linkList.add((String)linksToCrawl.poll());
                }
                completionService.submit(new FetchThread<T>(linkList, this.extractor));
                ++currentRunningThreads;
            }
            Future poll = null;
            poll = linksToCrawl.isEmpty() && currentRunningThreads > 0 || currentRunningThreads > this.poolSize ? completionService.take() : completionService.poll();
            if (poll != null) {
                --currentRunningThreads;
                Set set = (Set)poll.get();
                if (set == null) continue;
                for (FetchResult v : set) {
                    for (String out : v.outlinks) {
                        if (visited.mightContain((Object)out)) continue;
                        linksToCrawl.offer(out);
                        visited.put((Object)out);
                    }
                    this.persister.add(v);
                }
            } else {
                Thread.sleep(1000L);
            }
        } while ((this.fetches > 0 || currentRunningThreads != 0) && (currentRunningThreads != 0 || linksToCrawl.size() != 0));
        this.persister.stop();
        this.persisterThread.join();
        threadPool.shutdownNow();
        LOG.info("Took overall time of " + (System.currentTimeMillis() - appStart) / 1000L + "s.");
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
        String seedUrl = "http://news.google.de/";
        new MultithreadedCrawler<FetchResult>(1000, new OutlinkExtractor(), new SequenceFileResultWriter()).process(seedUrl);
    }
}

