package io.simplesource.kafka.internal.client;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/simplesource/kafka/internal/client/KafkaConsumerRunner.class */
public final class KafkaConsumerRunner {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunner.class);

    /* loaded from: input_file:io/simplesource/kafka/internal/client/KafkaConsumerRunner$RunnableConsumer.class */
    static class RunnableConsumer<R> implements Runnable {
        private final KafkaConsumer<String, R> consumer;
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final String topicName;
        private final BiConsumer<UUID, R> receiver;

        RunnableConsumer(Properties properties, Serde<R> serde, String str, BiConsumer<UUID, R> biConsumer) {
            this.topicName = str;
            this.receiver = biConsumer;
            this.consumer = new KafkaConsumer<>(properties, Serdes.String().deserializer(), serde.deserializer());
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.consumer.subscribe(Collections.singletonList(this.topicName));
                while (!this.closed.get()) {
                    this.consumer.poll(Duration.ofSeconds(1L)).iterator().forEachRemaining(consumerRecord -> {
                        this.receiver.accept(UUID.fromString(((String) consumerRecord.key()).substring(((String) consumerRecord.key()).length() - 36)), consumerRecord.value());
                    });
                }
            } catch (WakeupException e) {
                if (!this.closed.get()) {
                    throw e;
                }
            } catch (Exception e2) {
                KafkaConsumerRunner.logger.error(e2.getMessage());
            } finally {
                this.consumer.close();
            }
        }

        void close() {
            this.closed.set(true);
            this.consumer.wakeup();
        }
    }

    KafkaConsumerRunner() {
    }

    private static Properties copyProperties(Map<String, Object> map) {
        Properties properties = new Properties();
        map.forEach((str, obj) -> {
            properties.setProperty(str, obj.toString());
        });
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <R> ResponseSubscription run(Map<String, Object> map, String str, Serde<R> serde, BiConsumer<UUID, R> biConsumer) {
        Properties copyProperties = copyProperties(map);
        copyProperties.setProperty("group.id", String.format("response_consumer_%s", UUID.randomUUID().toString().substring(0, 8)));
        RunnableConsumer runnableConsumer = new RunnableConsumer(copyProperties, serde, str, biConsumer);
        new Thread(runnableConsumer).start();
        runnableConsumer.getClass();
        return runnableConsumer::close;
    }
}
