package net.csini.spring.kafka.observer;

import io.reactivex.rxjava3.annotations.CheckReturnValue;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import net.csini.spring.kafka.KafkaEntityKey;
import net.csini.spring.kafka.KafkaEntityObserver;
import net.csini.spring.kafka.exception.KafkaEntityException;
import net.csini.spring.kafka.mapping.JsonKeySerializer;
import net.csini.spring.kafka.util.KafkaEntityUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.support.serializer.JsonSerializer;

/* loaded from: input_file:net/csini/spring/kafka/observer/SimpleKafkaEntityObserver.class */
public final class SimpleKafkaEntityObserver<T, K> implements Observer<T>, DisposableBean, InitializingBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaEntityObserver.class);

    @Value("${spring.kafka.bootstrap-servers:localhost:9092}")
    private List<String> bootstrapServers = new ArrayList(Collections.singletonList("localhost:9092"));
    private String clientid;
    private KafkaProducer<K, T> kafkaProducer;
    private Class<T> clazz;
    private String topic;
    Throwable error;
    private Field keyField;
    private boolean transactional;

    @CheckReturnValue
    @NonNull
    public static <T, K> SimpleKafkaEntityObserver<T, K> create(KafkaEntityObserver kafkaEntityObserver, String str) throws KafkaEntityException {
        return new SimpleKafkaEntityObserver<>(kafkaEntityObserver, str);
    }

    SimpleKafkaEntityObserver(KafkaEntityObserver kafkaEntityObserver, String str) throws KafkaEntityException {
        this.clazz = kafkaEntityObserver.entity();
        this.clientid = str;
        this.topic = KafkaEntityUtil.getTopicName(this.clazz);
        this.transactional = kafkaEntityObserver.transactional();
        Field[] declaredFields = this.clazz.getDeclaredFields();
        int length = declaredFields.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            Field field = declaredFields[i];
            LOGGER.debug("    field  -> " + field.getName());
            if (field.isAnnotationPresent(KafkaEntityKey.class)) {
                field.setAccessible(true);
                this.keyField = field;
                break;
            }
            i++;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.bootstrapServers);
        hashMap.put("key.serializer", JsonKeySerializer.class);
        hashMap.put("value.serializer", JsonSerializer.class);
        hashMap.put("client.id", this.clientid);
        if (this.transactional) {
            hashMap.put("transactional.id", str + "-transactional-id");
            hashMap.put("enable.idempotence", "true");
        }
        this.kafkaProducer = new KafkaProducer<>(hashMap, new JsonKeySerializer(), new JsonSerializer());
        if (this.transactional) {
            LOGGER.info("initTransactions-begin " + str);
            this.kafkaProducer.initTransactions();
            LOGGER.info("initTransactions-end " + str);
        }
    }

    private K extractKey(T t) throws IllegalArgumentException, IllegalAccessException {
        return (K) this.keyField.get(t);
    }

    public void onSubscribe(Disposable disposable) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("sending events");
        }
        if (this.transactional) {
            this.kafkaProducer.beginTransaction();
        }
    }

    public void onNext(T t) {
        ExceptionHelper.nullCheck(t, "onNext called with a null value.");
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            completableFuture.complete((RecordMetadata) this.kafkaProducer.send(new ProducerRecord(this.topic, extractKey(t), t)).get());
        } catch (IllegalAccessException e) {
            completableFuture.completeExceptionally(e);
        } catch (InterruptedException | ExecutionException e2) {
            completableFuture.completeExceptionally(e2);
        }
        try {
            completableFuture.get();
        } catch (InterruptedException | ExecutionException e3) {
            onError(e3);
        }
    }

    public void onError(Throwable th) {
        ExceptionHelper.nullCheck(th, "onError called with a null Throwable.");
        RxJavaPlugins.onError(th);
        if (this.transactional) {
            this.kafkaProducer.abortTransaction();
        }
    }

    public void onComplete() {
        if (this.transactional) {
            this.kafkaProducer.commitTransaction();
        }
    }

    public void afterPropertiesSet() throws Exception {
    }

    public void destroy() throws Exception {
        this.kafkaProducer.close();
    }
}
