/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.others.kafka;

import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import net.wicp.tams.common.Result;
import net.wicp.tams.common.others.kafka.IConsumer;
import net.wicp.tams.common.others.kafka.KafkaTools;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaConsumerGroup<T> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerGroup.class);
    private List<KafkaConsumerGroupThread> consumerThreadList = new ArrayList<KafkaConsumerGroupThread>();
    private String groupId;
    private String topic;
    private IConsumer<T> doConsumer;

    public KafkaConsumerGroup(String groupId, String topic, IConsumer<T> doConsumer) {
        this.groupId = groupId;
        this.topic = topic;
        this.doConsumer = doConsumer;
        for (int i = 0; i < 3; ++i) {
            KafkaConsumerGroupThread consumerThread = new KafkaConsumerGroupThread();
            this.consumerThreadList.add(consumerThread);
        }
    }

    public Class<T> getTClass() {
        Class tClass = (Class)((ParameterizedType)this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
        return tClass;
    }

    public void start() {
        for (KafkaConsumerGroupThread item : this.consumerThreadList) {
            Thread thread = new Thread(item);
            thread.start();
        }
    }

    private class KafkaConsumerGroupThread
    implements Runnable {
        private KafkaConsumer<String, T> kafkaConsumer;

        public KafkaConsumerGroupThread() {
            Properties props = KafkaTools.getProps(false);
            props.put("group.id", KafkaConsumerGroup.this.groupId);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", KafkaTools.getValueProp(KafkaConsumerGroup.this.getTClass(), false));
            log.info("kafka consumer \u53c2\u6570:");
            for (Object propele : props.keySet()) {
                log.info("{}:{}", propele, props.get(propele));
            }
            this.kafkaConsumer = new KafkaConsumer(props);
            this.kafkaConsumer.subscribe(Arrays.asList(KafkaConsumerGroup.this.topic));
        }

        @Override
        public void run() {
            block0: while (true) {
                ConsumerRecords consumerRecords = this.kafkaConsumer.poll(100L);
                Iterator iterator = consumerRecords.iterator();
                while (true) {
                    if (!iterator.hasNext()) continue block0;
                    ConsumerRecord item = (ConsumerRecord)iterator.next();
                    Result doWithRecord = KafkaConsumerGroup.this.doConsumer.doWithRecord(item);
                    log.info(doWithRecord.getMessage());
                }
                break;
            }
        }
    }
}

