package net.csini.spring.kafka.observable;

import io.reactivex.rxjava3.annotations.CheckReturnValue;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import java.lang.reflect.Field;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import net.csini.spring.kafka.KafkaEntityKey;
import net.csini.spring.kafka.KafkaEntityObservable;
import net.csini.spring.kafka.exception.KafkaEntityException;
import org.apache.kafka.clients.admin.AdminClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:net/csini/spring/kafka/observable/SimpleKafkaEntityObservable.class */
public class SimpleKafkaEntityObservable<T, K> extends Observable<T> implements DisposableBean, InitializingBean {

    @Value("${spring.kafka.bootstrap-servers:localhost:9092}")
    private List<String> bootstrapServers = new ArrayList(Collections.singletonList("localhost:9092"));
    private String groupid;
    private Class<T> clazz;
    private Class<K> clazzKey;
    private final KafkaEntityPollingRunnable<T, K> pollingRunnable;
    private String beanName;
    final AtomicReference<KafkaEntityObservableDisposable<T, K>[]> subscribers;
    Throwable error;
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaEntityObservable.class);
    static final KafkaEntityObservableDisposable[] TERMINATED = new KafkaEntityObservableDisposable[0];
    static final KafkaEntityObservableDisposable[] EMPTY = new KafkaEntityObservableDisposable[0];

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:net/csini/spring/kafka/observable/SimpleKafkaEntityObservable$KafkaEntityObservableDisposable.class */
    public static final class KafkaEntityObservableDisposable<T, K> extends AtomicBoolean implements Disposable {
        private static final long serialVersionUID = 3562861878281475070L;
        final Observer<? super T> downstream;
        final SimpleKafkaEntityObservable<T, K> parent;

        KafkaEntityObservableDisposable(Observer<? super T> observer, SimpleKafkaEntityObservable<T, K> simpleKafkaEntityObservable) {
            this.downstream = observer;
            this.parent = simpleKafkaEntityObservable;
        }

        public void onNext(T t) {
            if (get()) {
                return;
            }
            this.downstream.onNext(t);
        }

        public void onError(Throwable th) {
            if (get()) {
                RxJavaPlugins.onError(th);
            } else {
                this.downstream.onError(th);
            }
        }

        public void onComplete() {
            if (get()) {
                return;
            }
            this.downstream.onComplete();
        }

        public void dispose() {
            if (compareAndSet(false, true)) {
                this.parent.remove(this);
            }
        }

        public boolean isDisposed() {
            return get();
        }
    }

    SimpleKafkaEntityObservable(KafkaEntityObservable kafkaEntityObservable, String str) throws KafkaEntityException {
        this.clazz = kafkaEntityObservable.entity();
        this.groupid = str;
        Field[] declaredFields = this.clazz.getDeclaredFields();
        int length = declaredFields.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            Field field = declaredFields[i];
            LOGGER.debug("    field  -> " + field.getName());
            if (field.isAnnotationPresent(KafkaEntityKey.class)) {
                field.setAccessible(true);
                this.clazzKey = (Class<K>) field.getType();
                break;
            }
            i++;
        }
        this.subscribers = new AtomicReference<>(EMPTY);
        this.beanName = str;
        this.pollingRunnable = new KafkaEntityPollingRunnable<>(this.groupid, this.clazz, this.clazzKey, this.subscribers, this.bootstrapServers, str);
        start();
    }

    private void start() throws KafkaEntityException {
        if (this.pollingRunnable.getStarted().get()) {
            LOGGER.warn("already started...");
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("starting " + this.beanName + "...");
        }
        Thread thread = new Thread(this.pollingRunnable);
        thread.setName(this.beanName + "Thread");
        thread.start();
        LOGGER.info("waiting the consumer to start in " + this.beanName + "Thread...");
        LocalDateTime now = LocalDateTime.now();
        while (!this.pollingRunnable.getStarted().get()) {
            if (ChronoUnit.SECONDS.between(now, LocalDateTime.now()) >= 300) {
                throw new KafkaEntityException(this.beanName, "KafkaConsumer could not start in 300 sec.");
            }
        }
    }

    public void afterPropertiesSet() throws Exception {
    }

    public void destroy() throws Exception {
        LOGGER.warn("deleting a Consumer Group for " + this.beanName);
        this.pollingRunnable.getStopped().set(true);
        LOGGER.info("waiting polling to stop");
        LocalDateTime now = LocalDateTime.now();
        while (this.pollingRunnable.getStarted().get() && ChronoUnit.SECONDS.between(now, LocalDateTime.now()) < 20) {
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("request.timeout.ms", "1000");
        properties.put("default.api.timeout.ms", "5000");
        try {
            AdminClient create = AdminClient.create(properties);
            try {
                create.deleteConsumerGroups(Arrays.asList(this.groupid)).all().get();
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
    }

    @CheckReturnValue
    @NonNull
    public static <T, K> SimpleKafkaEntityObservable<T, K> create(KafkaEntityObservable kafkaEntityObservable, String str) throws KafkaEntityException {
        return new SimpleKafkaEntityObservable<>(kafkaEntityObservable, str);
    }

    protected void subscribeActual(Observer<? super T> observer) {
        KafkaEntityObservableDisposable<T, K> kafkaEntityObservableDisposable = new KafkaEntityObservableDisposable<>(observer, this);
        observer.onSubscribe(kafkaEntityObservableDisposable);
        if (add(kafkaEntityObservableDisposable)) {
            if (kafkaEntityObservableDisposable.isDisposed()) {
                remove(kafkaEntityObservableDisposable);
            }
        } else {
            Throwable th = this.error;
            if (th != null) {
                observer.onError(th);
            } else {
                observer.onComplete();
            }
        }
    }

    boolean add(KafkaEntityObservableDisposable<T, K> kafkaEntityObservableDisposable) {
        KafkaEntityObservableDisposable<T, K>[] kafkaEntityObservableDisposableArr;
        KafkaEntityObservableDisposable<T, K>[] kafkaEntityObservableDisposableArr2;
        do {
            kafkaEntityObservableDisposableArr = this.subscribers.get();
            if (kafkaEntityObservableDisposableArr == TERMINATED) {
                return false;
            }
            int length = kafkaEntityObservableDisposableArr.length;
            kafkaEntityObservableDisposableArr2 = new KafkaEntityObservableDisposable[length + 1];
            System.arraycopy(kafkaEntityObservableDisposableArr, 0, kafkaEntityObservableDisposableArr2, 0, length);
            kafkaEntityObservableDisposableArr2[length] = kafkaEntityObservableDisposable;
        } while (!this.subscribers.compareAndSet(kafkaEntityObservableDisposableArr, kafkaEntityObservableDisposableArr2));
        return true;
    }

    void remove(KafkaEntityObservableDisposable<T, K> kafkaEntityObservableDisposable) {
        KafkaEntityObservableDisposable<T, K>[] kafkaEntityObservableDisposableArr;
        KafkaEntityObservableDisposable<T, K>[] kafkaEntityObservableDisposableArr2;
        do {
            kafkaEntityObservableDisposableArr = this.subscribers.get();
            if (kafkaEntityObservableDisposableArr == TERMINATED || kafkaEntityObservableDisposableArr == EMPTY) {
                return;
            }
            int length = kafkaEntityObservableDisposableArr.length;
            int i = -1;
            int i2 = 0;
            while (true) {
                if (i2 >= length) {
                    break;
                }
                if (kafkaEntityObservableDisposableArr[i2] == kafkaEntityObservableDisposable) {
                    i = i2;
                    break;
                }
                i2++;
            }
            if (i < 0) {
                return;
            }
            if (length == 1) {
                kafkaEntityObservableDisposableArr2 = EMPTY;
            } else {
                kafkaEntityObservableDisposableArr2 = new KafkaEntityObservableDisposable[length - 1];
                System.arraycopy(kafkaEntityObservableDisposableArr, 0, kafkaEntityObservableDisposableArr2, 0, i);
                System.arraycopy(kafkaEntityObservableDisposableArr, i + 1, kafkaEntityObservableDisposableArr2, i, (length - i) - 1);
            }
        } while (!this.subscribers.compareAndSet(kafkaEntityObservableDisposableArr, kafkaEntityObservableDisposableArr2));
    }
}
