/*
 * Decompiled with CFR 0.152.
 */
package net.kut3.messaging.kafka.client;

import ch.qos.logback.classic.Level;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import net.kut3.messaging.BatchMessageProcessor;
import net.kut3.messaging.Consumer;
import net.kut3.messaging.Message;
import net.kut3.messaging.MessageProcessor;
import net.kut3.messaging.ProcessResult;
import net.kut3.messaging.kafka.Component;
import net.kut3.messaging.kafka.OffsetResetMode;
import net.kut3.messaging.kafka.client.KafkaClientFactory;
import net.kut3.messaging.kafka.client.KafkaMessage;
import net.kut3.messaging.kafka.client.SimpleConsumerProperties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConsumerImpl
implements Consumer,
Component {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerImpl.class);
    private final String name;
    private final boolean isAutoAck;
    private final KafkaConsumer<String, String> consumer;
    private final Collection<String> topics;

    ConsumerImpl(String name, Map<String, Object> props, Collection<String> topics) {
        this.name = name;
        this.isAutoAck = null != props.get("enable.auto.commit") ? (Boolean)props.get("enable.auto.commit") : true;
        LOGGER.info("Consumer '" + this.name + "' - " + "enable.auto.commit" + "=" + this.isAutoAck);
        this.consumer = new KafkaConsumer(props);
        this.topics = topics;
        LOGGER.info("Consumer '" + this.name + "' - topics=" + Arrays.toString(topics.toArray()));
    }

    public String name() {
        return this.name;
    }

    public boolean isAutoAck() {
        return this.isAutoAck;
    }

    /*
     * Unable to fully structure code
     */
    public void start(MessageProcessor messageProcessor) {
        this.consumer.subscribe(this.topics);
        block0: while (true) {
            if ((records = this.consumer.poll(Duration.ofMillis(10000L))).isEmpty()) {
                ConsumerImpl.LOGGER.info("Consumer '" + this.name + "' - No records found");
                continue;
            }
            var3_3 = records.iterator();
            while (true) {
                if (var3_3.hasNext()) ** break;
                continue block0;
                record = (ConsumerRecord)var3_3.next();
                messageProcessor.process((Message)new KafkaMessage((ConsumerRecord<String, String>)record));
            }
            break;
        }
    }

    public void start(BatchMessageProcessor messageProcessor) {
        this.consumer.subscribe(this.topics);
        while (true) {
            ConsumerRecords records;
            if ((records = this.consumer.poll(Duration.ofMillis(10000L))).isEmpty()) {
                LOGGER.info("Consumer '" + this.name + "' - No records found");
                continue;
            }
            for (ConsumerRecord record : records) {
                messageProcessor.addToBatch((Message)new KafkaMessage((ConsumerRecord<String, String>)record));
            }
            messageProcessor.processBatch();
        }
    }

    public void close() {
        this.consumer.close();
    }

    public static void main(String[] args) throws InterruptedException {
        KafkaClientFactory clientFactory = new KafkaClientFactory();
        String producerName = "kafka-client-0.3.0-01";
        String servers = "10.1.1.99:9092,10.1.1.99:9093,10.1.1.98:9094";
        String groupId = "net.kut3.messaging.kafka.test-group";
        String topic = "dev.Merchant";
        ((ch.qos.logback.classic.Logger)LoggerFactory.getLogger((String)"org.apache.kafka")).setLevel(Level.ERROR);
        LoggerFactory.getLogger((String)topic).info("Begin");
        Consumer consumer = clientFactory.newConsumer(new SimpleConsumerProperties(producerName, servers, groupId, OffsetResetMode.EARLIEST, Arrays.asList(topic)));
        consumer.start(new BatchMessageProcessor(){

            public void addToBatch(Message message) {
                System.out.println(message.toString());
            }

            public List<ProcessResult> processBatch() {
                System.out.println("Done batch");
                return new ArrayList<ProcessResult>();
            }
        });
        consumer.close();
    }
}

