/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.others.kafka;

import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.wicp.tams.common.others.kafka.IConsumer;
import net.wicp.tams.common.others.kafka.KafkaTools;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public abstract class KafkaConsumerThread<T> {
    private final KafkaConsumer<String, T> consumer;
    private final String topic;
    private final IConsumer<T> doConsumer;
    private ExecutorService executor;

    public KafkaConsumerThread(String groupId, String topic, IConsumer<T> doConsumer) {
        Properties props = KafkaTools.getProps(false);
        props.put("group.id", groupId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", KafkaTools.getValueProp(this.getTClass(), false));
        this.consumer = new KafkaConsumer(props);
        this.topic = topic;
        this.doConsumer = doConsumer;
        this.consumer.subscribe(Arrays.asList(this.topic));
    }

    public Class<T> getTClass() {
        Class tClass = (Class)((ParameterizedType)this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
        return tClass;
    }

    public void start(int threadNumber) {
        this.executor = new ThreadPoolExecutor(threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        block0: while (true) {
            ConsumerRecords consumerRecords = this.consumer.poll(100L);
            Iterator iterator = consumerRecords.iterator();
            while (true) {
                if (!iterator.hasNext()) continue block0;
                final ConsumerRecord item = (ConsumerRecord)iterator.next();
                this.executor.submit(new Runnable(){

                    @Override
                    public void run() {
                        KafkaConsumerThread.this.doConsumer.doWithRecord(item);
                    }
                });
            }
            break;
        }
    }
}

