/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.others.kafka;

import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.wicp.tams.common.Conf;
import net.wicp.tams.common.Result;
import net.wicp.tams.common.others.kafka.IConsumer;
import net.wicp.tams.common.others.kafka.KafkaTools;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaConsumerThread<T> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerThread.class);
    private final KafkaConsumer<String, T> consumer;
    private final String topic;
    private final IConsumer<T> doConsumer;
    private ExecutorService executor;
    private int batchNum = Integer.parseInt(Conf.get((String)"common.others.kafka.consumer.batch.num"));
    private long timeout = Long.parseLong(Conf.get((String)"common.others.kafka.consumer.batch.timeout"));

    public KafkaConsumerThread(String groupId, String topic, IConsumer<T> doConsumer) {
        Properties props = KafkaTools.getProps(false);
        props.put("group.id", groupId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", KafkaTools.getValueProp(this.getTClass(), false));
        this.consumer = new KafkaConsumer(props);
        this.topic = topic;
        this.doConsumer = doConsumer;
        this.consumer.subscribe(Arrays.asList(this.topic));
    }

    public KafkaConsumerThread(String topic, IConsumer<T> doConsumer) {
        this(Conf.get((String)"common.others.kafka.consumer.group.id"), topic, doConsumer);
    }

    public Class<T> getTClass() {
        Class tClass = (Class)((ParameterizedType)this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
        return tClass;
    }

    public void start(int threadNumber) {
        this.executor = new ThreadPoolExecutor(threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        final ArrayList<ConsumerRecord> buffer = new ArrayList<ConsumerRecord>();
        long maxTime = this.timeout * 3L;
        long startTime = System.currentTimeMillis();
        while (true) {
            final ConsumerRecords consumerRecords = this.consumer.poll(this.timeout);
            for (ConsumerRecord consumerRecord : consumerRecords) {
                buffer.add(consumerRecord);
            }
            long time2 = System.currentTimeMillis();
            if (buffer.size() < this.batchNum && time2 - startTime <= maxTime) continue;
            this.executor.submit(new Runnable(){

                @Override
                public void run() {
                    Result doWithRecords = KafkaConsumerThread.this.doConsumer.doWithRecords(buffer);
                    KafkaTools.errorlog(consumerRecords, doWithRecords, log);
                }
            });
            this.consumer.commitSync();
            buffer.clear();
            log.info("\u4ecekafka\u53d6\u6570\u636e\u7528\u65f6\uff1a{},\u6570\u91cf\uff1a{}", (Object)(time2 - startTime), (Object)buffer.size());
            startTime = System.currentTimeMillis();
        }
    }
}

