package com.antbrains.urlcrawler.crawler;

import com.antbrains.urlcrawler.db.CrawlTask;
import com.antbrains.urlcrawler.db.HbaseTool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/antbrains/urlcrawler/crawler/Writer.class */
public class Writer extends Thread {
    protected static Logger logger = Logger.getLogger(Writer.class);
    BlockingQueue<CrawlTask> resQueue;
    Connection hbaseConn;
    String dbName;
    private volatile boolean bStop;
    int batchSize = 100;
    ArrayList<CrawlTask> cache = new ArrayList<>(this.batchSize);
    long updateInterval = 60000;
    long lastUpdate;

    public Writer(String str, BlockingQueue<CrawlTask> blockingQueue, String str2, String str3) throws Exception {
        this.resQueue = blockingQueue;
        this.dbName = str;
        Configuration create = HBaseConfiguration.create();
        create.set("hbase.zookeeper.quorum", str2);
        if (str3 != null) {
            create.set("hbase.zookeeper.property.clientPort", str3);
        }
        this.hbaseConn = ConnectionFactory.createConnection(create);
    }

    public void stopMe() {
        logger.info("receive stop signal");
        this.bStop = true;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.lastUpdate = System.currentTimeMillis();
        while (!this.bStop) {
            try {
                CrawlTask poll = this.resQueue.poll(3L, TimeUnit.SECONDS);
                if (poll != null) {
                    this.cache.add(poll);
                    if (this.cache.size() >= this.batchSize) {
                        flushCache();
                    }
                } else if (System.currentTimeMillis() - this.lastUpdate > this.updateInterval) {
                    flushCache();
                }
            } catch (InterruptedException e) {
            }
        }
        flushCache();
        try {
            this.hbaseConn.close();
        } catch (IOException e2) {
            logger.error(e2.getMessage(), e2);
        }
        logger.info("stopped");
    }

    private void flushCache() {
        try {
            HbaseTool.updateWebPage(this.dbName, this.hbaseConn, this.cache);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            Iterator<CrawlTask> it = this.cache.iterator();
            while (it.hasNext()) {
                CrawlTask next = it.next();
                arrayList3.add(next.crawlUrl);
                if (next.status == 2) {
                    arrayList2.add(next);
                } else if (next.status == 3) {
                    arrayList.add(next.crawlUrl);
                } else {
                    logger.warn("algo bug: " + next);
                }
            }
            HbaseTool.addRows(this.dbName, HbaseTool.TB_URLDB_SUCC, this.hbaseConn, arrayList);
            HbaseTool.addFailed(this.dbName, this.hbaseConn, arrayList2);
            HbaseTool.delRows(this.dbName, HbaseTool.TB_URLDB_CRAWLING, this.hbaseConn, arrayList3);
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
        this.cache.clear();
        this.lastUpdate = System.currentTimeMillis();
    }
}
