/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.others.kafka;

import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import net.wicp.tams.common.Conf;
import net.wicp.tams.common.Result;
import net.wicp.tams.common.apiext.ReflectAssist;
import net.wicp.tams.common.others.constant.SeekPosition;
import net.wicp.tams.common.others.kafka.IConsumer;
import net.wicp.tams.common.others.kafka.KafkaAssitInst;
import net.wicp.tams.common.others.kafka.KafkaTools;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaConsumerGroup<T> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerGroup.class);
    private List<KafkaConsumerGroupThread> consumerThreadList = new ArrayList<KafkaConsumerGroupThread>();
    private String groupId;
    private String topic;
    private IConsumer<T> doConsumer;
    private int batchNum = Integer.parseInt(Conf.get((String)"common.others.kafka.consumer.batch.num"));
    private long timeout = Long.parseLong(Conf.get((String)"common.others.kafka.consumer.batch.timeout"));

    public KafkaConsumerGroup(String groupId, String topic, IConsumer<T> doConsumer, int hosts) {
        this.groupId = groupId;
        this.topic = topic;
        this.doConsumer = doConsumer;
        Class classz = ReflectAssist.getSuperClassGenricType(this.getClass());
        KafkaProducer kafkaProducer = KafkaAssitInst.getInst().getKafkaProducer(classz);
        List partitions = kafkaProducer.partitionsFor(topic);
        log.info("topic======{},partitions size====={}", (Object)topic, (Object)partitions.size());
        int consumerNum = partitions.size() / hosts + (partitions.size() % hosts > 0 ? 1 : 0);
        log.info("consumerNum====={}", (Object)consumerNum);
        for (int i = 0; i < consumerNum; ++i) {
            KafkaConsumerGroupThread consumerThread = new KafkaConsumerGroupThread();
            this.consumerThreadList.add(consumerThread);
        }
    }

    public KafkaConsumerGroup(String topic, IConsumer<T> doConsumer, int hosts) {
        this(Conf.get((String)"common.others.kafka.consumer.group.id"), topic, doConsumer, hosts);
    }

    public void seekPotion(SeekPosition seekPosition, Long position) {
        for (KafkaConsumerGroupThread kafkaConsumer : this.consumerThreadList) {
            kafkaConsumer.seekPotion(seekPosition, position);
        }
    }

    public Class<T> getTClass() {
        Class tClass = (Class)((ParameterizedType)this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
        return tClass;
    }

    public void start() {
        for (KafkaConsumerGroupThread item : this.consumerThreadList) {
            Thread thread = new Thread(item);
            thread.start();
        }
    }

    private class KafkaConsumerGroupThread
    implements Runnable {
        private KafkaConsumer<String, T> kafkaConsumer;

        public KafkaConsumerGroupThread(SeekPosition seekPosition, Long position) {
            Properties props = KafkaTools.getProps(false);
            props.put("group.id", KafkaConsumerGroup.this.groupId);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", KafkaTools.getValueProp(KafkaConsumerGroup.this.getTClass(), false));
            props.put("max.poll.records", (Object)(KafkaConsumerGroup.this.batchNum + 1));
            log.info("kafka consumer \u53c2\u6570:");
            for (Object propele : props.keySet()) {
                log.info("{}:{}", propele, props.get(propele));
            }
            this.kafkaConsumer = new KafkaConsumer(props);
            this.kafkaConsumer.subscribe(Arrays.asList(KafkaConsumerGroup.this.topic));
            this.seekPotion(seekPosition, position);
        }

        public void seekPotion(SeekPosition seekPosition, Long position) {
            List partitions = this.kafkaConsumer.partitionsFor(KafkaConsumerGroup.this.topic);
            if (CollectionUtils.isNotEmpty((Collection)partitions) && seekPosition != SeekPosition.no) {
                ArrayList<TopicPartition> ptlist = new ArrayList<TopicPartition>();
                for (PartitionInfo partitionInfo : partitions) {
                    ptlist.add(new TopicPartition(KafkaConsumerGroup.this.topic, partitionInfo.partition()));
                }
                this.kafkaConsumer.poll(0L);
                switch (seekPosition) {
                    case begin: {
                        this.kafkaConsumer.seekToBeginning(ptlist);
                        break;
                    }
                    case end: {
                        this.kafkaConsumer.seekToEnd(ptlist);
                        break;
                    }
                    case user: {
                        if (position == null || position <= 0L) break;
                        for (TopicPartition topicPartition : ptlist) {
                            this.kafkaConsumer.seek(topicPartition, position.longValue());
                        }
                        break;
                    }
                }
            }
        }

        public KafkaConsumerGroupThread() {
            this(SeekPosition.no, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ArrayList buffer = new ArrayList();
            long maxTime = KafkaConsumerGroup.this.timeout * 3L;
            long startTime = System.currentTimeMillis();
            while (true) {
                ConsumerRecords consumerRecords = this.kafkaConsumer.poll(KafkaConsumerGroup.this.timeout);
                for (ConsumerRecord consumerRecord : consumerRecords) {
                    buffer.add(consumerRecord);
                }
                long time2 = System.currentTimeMillis();
                if (buffer.size() < KafkaConsumerGroup.this.batchNum && (time2 - startTime <= maxTime || buffer.size() <= 0)) continue;
                Result doWithRecord = null;
                try {
                    doWithRecord = KafkaConsumerGroup.this.doConsumer.doWithRecords(buffer);
                }
                catch (Throwable e) {
                    log.error("\u4e1a\u52a1\u5904\u7406\u5931\u8d25", e);
                    doWithRecord = Result.getError((String)e.getMessage());
                }
                finally {
                    KafkaTools.errorlog(consumerRecords, doWithRecord, log);
                }
                if (doWithRecord.isSuc()) {
                    try {
                        this.kafkaConsumer.commitSync();
                    }
                    catch (Throwable e) {
                        log.error("commit error", e);
                    }
                }
                log.info("from kafka server,the time:{},records:{}", (Object)(time2 - startTime), (Object)buffer.size());
                buffer.clear();
                startTime = System.currentTimeMillis();
            }
        }
    }
}

