package com.github.shoothzj.kafka.consumer;

import com.github.shoothzj.kafka.KafkaConstant;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/shoothzj/kafka/consumer/ConsumerDemo.class */
public class ConsumerDemo {
    private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class);

    public KafkaConsumer<String, String> createConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KafkaConstant.kafkaIpList.stream().map(str -> {
            return str + ":9092";
        }).collect(Collectors.joining(",")));
        properties.put("group.id", "ShootHzj");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<>(properties);
    }

    public void subscribeExample() {
        createConsumer().subscribe(Collections.singletonList("Topic"));
        createConsumer().subscribe(Pattern.compile("test.*"));
    }

    public void consumeExample() {
        KafkaConsumer<String, String> createConsumer = createConsumer();
        while (true) {
            try {
                createConsumer.poll(Duration.ofMillis(100L)).forEach(consumerRecord -> {
                    log.debug("topic = {}, partition = {}, offset = {}, customer = {}, country = {}", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
                });
            } catch (Throwable th) {
                createConsumer.close();
                throw th;
            }
        }
    }

    public void consumeSyncCommit() {
        KafkaConsumer<String, String> createConsumer = createConsumer();
        while (true) {
            createConsumer.poll(Duration.ofMillis(100L)).forEach(consumerRecord -> {
                log.debug("topic = {}, partition = {}, offset = {}, customer = {}, country = {}", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
            });
            try {
                createConsumer.commitSync();
            } catch (Exception e) {
                log.error("commit failed", e);
            }
        }
    }

    public void consumeAsyncCommit() {
        KafkaConsumer<String, String> createConsumer = createConsumer();
        while (true) {
            createConsumer.poll(Duration.ofMillis(100L)).forEach(consumerRecord -> {
                log.debug("topic = {}, partition = {}, offset = {}, customer = {}, country = {}", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
            });
            createConsumer.commitAsync();
        }
    }

    public void consumeAsyncCommitWithCallback() {
        KafkaConsumer<String, String> createConsumer = createConsumer();
        while (true) {
            createConsumer.poll(Duration.ofMillis(100L)).forEach(consumerRecord -> {
                log.debug("topic = {}, partition = {}, offset = {}, customer = {}, country = {}", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
            });
            createConsumer.commitAsync((map, exc) -> {
                if (exc != null) {
                    log.error("Commit failed for offsets {}", map, exc);
                }
            });
        }
    }

    public void onPartitionRevoked() {
        final HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        final KafkaConsumer<String, String> createConsumer = createConsumer();
        try {
            try {
                createConsumer.subscribe(arrayList, new ConsumerRebalanceListener() { // from class: com.github.shoothzj.kafka.consumer.ConsumerDemo.1
                    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    }

                    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                        ConsumerDemo.log.info("Lost partitions in rebalance. Committing offsets: {}", hashMap);
                        createConsumer.commitSync(hashMap);
                    }
                });
                while (true) {
                    createConsumer.poll(Duration.ofMillis(100L)).forEach(consumerRecord -> {
                        log.debug("topic = {}, partition = {}, offset = {}, customer = {}, country = {}", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
                        hashMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1, "no metadata"));
                    });
                    createConsumer.commitAsync(hashMap, (OffsetCommitCallback) null);
                }
            } catch (WakeupException e) {
                try {
                    createConsumer.commitSync(hashMap);
                    createConsumer.close();
                    log.info("Close consumer and we are done");
                } finally {
                }
            } catch (Exception e2) {
                log.error("Unexpected error", e2);
                try {
                    createConsumer.commitSync(hashMap);
                    createConsumer.close();
                    log.info("Close consumer and we are done");
                } finally {
                    createConsumer.close();
                    log.info("Close consumer and we are done");
                }
            }
        } catch (Throwable th) {
            try {
                createConsumer.commitSync(hashMap);
                createConsumer.close();
                log.info("Close consumer and we are done");
                throw th;
            } finally {
            }
        }
    }

    public void onReassignSeek() {
        final KafkaConsumer<String, String> createConsumer = createConsumer();
        createConsumer.subscribe(new ArrayList(), new ConsumerRebalanceListener() { // from class: com.github.shoothzj.kafka.consumer.ConsumerDemo.2
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                ConsumerDemo.this.commitDBTransaction();
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                for (TopicPartition topicPartition : collection) {
                    createConsumer.seek(topicPartition, ConsumerDemo.this.getOffsetFromDB(topicPartition));
                }
            }
        });
        createConsumer.poll(Duration.ofMillis(0L));
        for (TopicPartition topicPartition : createConsumer.assignment()) {
            createConsumer.seek(topicPartition, getOffsetFromDB(topicPartition));
        }
        while (true) {
            createConsumer.poll(Duration.ofMillis(100L)).forEach(new Consumer<ConsumerRecord<String, String>>() { // from class: com.github.shoothzj.kafka.consumer.ConsumerDemo.3
                @Override // java.util.function.Consumer
                public void accept(ConsumerRecord<String, String> consumerRecord) {
                    processRecord(consumerRecord);
                    storeRecordInDB(consumerRecord);
                    storeOffsetInDB(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
                }

                private void storeOffsetInDB(String str, int i, long j) {
                }

                private void storeRecordInDB(ConsumerRecord<String, String> consumerRecord) {
                }

                private void processRecord(ConsumerRecord<String, String> consumerRecord) {
                }
            });
            commitDBTransaction();
        }
    }

    private void commitDBTransaction() {
    }

    private long getOffsetFromDB(TopicPartition topicPartition) {
        return 0L;
    }

    public void assignPartitions() {
        KafkaConsumer<String, String> createConsumer = createConsumer();
        List<PartitionInfo> partitionsFor = createConsumer.partitionsFor("topic");
        HashSet hashSet = new HashSet();
        if (partitionsFor == null) {
            return;
        }
        for (PartitionInfo partitionInfo : partitionsFor) {
            hashSet.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        createConsumer.assign(hashSet);
        while (true) {
            createConsumer.poll(Duration.ofMillis(1000L)).forEach(consumerRecord -> {
                log.debug("topic = {}, partition = {}, offset = {}, customer = {}, country = {}", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
            });
            createConsumer.commitSync();
        }
    }
}
