package net.csini.spring.kafka.observable;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import net.csini.spring.kafka.mapping.JsonKeyDeserializer;
import net.csini.spring.kafka.observable.SimpleKafkaEntityObservable;
import net.csini.spring.kafka.util.KafkaEntityUtil;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

/* loaded from: input_file:net/csini/spring/kafka/observable/KafkaEntityPollingRunnable.class */
public class KafkaEntityPollingRunnable<T, K> implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEntityPollingRunnable.class);
    private final String groupid;
    private final Class<T> clazz;
    private final Class<K> clazzKey;
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicReference<SimpleKafkaEntityObservable.KafkaEntityObservableDisposable<T, K>[]> subscribers;
    private final List<String> bootstrapServers;
    private final String beanName;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaEntityPollingRunnable(String str, Class<T> cls, Class<K> cls2, AtomicReference<SimpleKafkaEntityObservable.KafkaEntityObservableDisposable<T, K>[]> atomicReference, List<String> list, String str2) {
        this.groupid = str;
        this.clazz = cls;
        this.clazzKey = cls2;
        this.subscribers = atomicReference;
        this.bootstrapServers = list;
        this.beanName = str2;
    }

    @Override // java.lang.Runnable
    public void run() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.bootstrapServers);
        hashMap.put("group.id", this.groupid);
        hashMap.put("enable.auto.commit", false);
        JsonDeserializer jsonDeserializer = new JsonDeserializer(getClazz());
        JsonKeyDeserializer jsonKeyDeserializer = new JsonKeyDeserializer(getClazzKey());
        jsonDeserializer.addTrustedPackages(new String[]{getClazz().getPackageName()});
        jsonKeyDeserializer.addTrustedPackages(new String[]{getClazzKey().getPackageName()});
        KafkaConsumer kafkaConsumer = new KafkaConsumer(hashMap, jsonKeyDeserializer, jsonDeserializer);
        try {
            kafkaConsumer.subscribe(List.of(KafkaEntityUtil.getTopicName(getClazz())));
            kafkaConsumer.poll(Duration.ofSeconds(10L));
            kafkaConsumer.seekToEnd(Collections.emptyList());
            kafkaConsumer.commitSync();
            LocalDateTime now = LocalDateTime.now();
            while (kafkaConsumer.committed(kafkaConsumer.assignment()).isEmpty()) {
                if (ChronoUnit.SECONDS.between(now, LocalDateTime.now()) >= 60) {
                    throw new RuntimeException("KafkaConsumer is not ready in 60sec.");
                }
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("started " + this.beanName + "...");
            }
            while (!this.stopped.get()) {
                this.started.set(true);
                if (this.subscribers.get().length > 0) {
                    LOGGER.info("POLL-" + this.groupid + " to " + this.subscribers.get().length + " subscribers");
                    ConsumerRecords poll = kafkaConsumer.poll(Duration.ofSeconds(10L));
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("poll.count: " + poll.count());
                    }
                    poll.forEach(consumerRecord -> {
                        if (LOGGER.isTraceEnabled()) {
                            LOGGER.trace("polled:" + String.valueOf(consumerRecord));
                        }
                        for (SimpleKafkaEntityObservable.KafkaEntityObservableDisposable kafkaEntityObservableDisposable : this.subscribers.get()) {
                            kafkaEntityObservableDisposable.onNext(consumerRecord.value());
                        }
                    });
                    kafkaConsumer.commitSync();
                }
            }
            kafkaConsumer.unsubscribe();
            this.started.set(false);
            kafkaConsumer.close();
        } catch (Throwable th) {
            try {
                kafkaConsumer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public Class<T> getClazz() {
        return this.clazz;
    }

    public Class<K> getClazzKey() {
        return this.clazzKey;
    }

    public AtomicBoolean getStopped() {
        return this.stopped;
    }

    public AtomicBoolean getStarted() {
        return this.started;
    }
}
