package com.github.imrafaelmerino.kafkacli;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import jio.ExceptionFun;
import jio.cli.ConsoleLogger;
import jio.cli.ConsolePrinter;
import jsonvalues.JsObj;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:com/github/imrafaelmerino/kafkacli/KafkaConsumers.class */
class KafkaConsumers implements Function<String, KafkaConsumer<Object, Object>> {
    private final Map<String, KafkaConsumer<Object, Object>> consumers = new HashMap();
    private final ExecutorService service = Executors.newCachedThreadPool();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaConsumers() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            Iterator<KafkaConsumer<Object, Object>> it = this.consumers.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (Exception e) {
                    ConsoleLogger.log("Exception closing consumer during shutdown hook: %s".formatted(e));
                }
            }
        }));
    }

    private static void printRecords(String str, List<String> list, ConsumerRecords<Object, Object> consumerRecords, boolean z) {
        ConsolePrinter.printlnResult("\nReceived %d records from topics `%s` in consumer `%s`%n".formatted(Integer.valueOf(consumerRecords.count()), list, str));
        if (z) {
            StringBuilder sb = new StringBuilder();
            Iterator it = consumerRecords.iterator();
            int i = 1;
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                int i2 = i;
                i++;
                sb.append(String.format("Record %d:%n", Integer.valueOf(i2)));
                sb.append(String.format("  Offset: %d%n", Long.valueOf(consumerRecord.offset())));
                Object[] objArr = new Object[1];
                objArr[0] = consumerRecord.key() != null ? consumerRecord.key() : "null";
                sb.append(String.format("  Key: %s%n", objArr));
                sb.append(String.format("  Value: %s%n", consumerRecord.value()));
                sb.append(String.format("  Partition: %d%n", Integer.valueOf(consumerRecord.partition())));
                sb.append(String.format("  Timestamp: %d%n", Long.valueOf(consumerRecord.timestamp())));
                sb.append("\n");
            }
            ConsolePrinter.printlnResult(sb.toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isStarted(String str) {
        return this.consumers.containsKey(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startConsumer(JsObj jsObj, String str, JsObj jsObj2, List<String> list, Duration duration, boolean z) {
        Properties properties = Fun.toProperties(jsObj);
        Properties properties2 = Fun.toProperties(jsObj2);
        properties2.putAll(properties);
        KafkaConsumer<Object, Object> kafkaConsumer = new KafkaConsumer<>(properties2);
        kafkaConsumer.subscribe(list);
        this.consumers.put(str, kafkaConsumer);
        Future submit = this.service.submit(() -> {
            while (true) {
                try {
                    ConsumerRecords poll = kafkaConsumer.poll(duration);
                    if (!poll.isEmpty()) {
                        printRecords(str, list, poll, z);
                    }
                } catch (Exception e) {
                    ConsolePrinter.printlnError("Exception while fetching records from Kafka: %s ".formatted(ExceptionFun.findUltimateCause(e)));
                }
            }
        });
        if (!$assertionsDisabled && submit == null) {
            throw new AssertionError();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commitAsync(String str) {
        this.consumers.get(str).commitAsync((map, exc) -> {
            if (exc == null) {
                ConsolePrinter.printlnResult("Commit request from consumer `%s` completed".formatted(str));
            } else {
                ConsolePrinter.printlnError("Commit request from consumer `%s` failed: %s".formatted(str, ExceptionFun.findUltimateCause(exc)));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopConsumer(String str) {
        KafkaConsumer<Object, Object> kafkaConsumer = this.consumers.get(str);
        if (kafkaConsumer != null) {
            kafkaConsumer.close();
            this.consumers.remove(str);
        }
    }

    @Override // java.util.function.Function
    public KafkaConsumer<Object, Object> apply(String str) {
        return this.consumers.get(str);
    }

    static {
        $assertionsDisabled = !KafkaConsumers.class.desiredAssertionStatus();
    }
}
