package com.github.lhotari.reactive.pulsar.producercache;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.CaffeineSpec;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.lhotari.reactive.pulsar.internal.adapter.AdapterImplementationFactory;
import com.github.lhotari.reactive.pulsar.resourceadapter.ProducerCacheKey;
import com.github.lhotari.reactive.pulsar.resourceadapter.PublisherTransformer;
import com.github.lhotari.reactive.pulsar.resourceadapter.ReactiveProducerCache;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Producer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/github/lhotari/reactive/pulsar/producercache/CaffeineReactiveProducerCache.class */
public class CaffeineReactiveProducerCache implements ReactiveProducerCache, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(CaffeineReactiveProducerCache.class);
    final AsyncCache<ProducerCacheKey, ProducerCacheEntry> cache;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/lhotari/reactive/pulsar/producercache/CaffeineReactiveProducerCache$ProducerCacheEntry.class */
    public static class ProducerCacheEntry {
        private final AtomicReference<Producer<?>> producer = new AtomicReference<>();
        private final AtomicReference<Mono<? extends Producer<?>>> producerCreator = new AtomicReference<>();
        private final AtomicInteger activeLeases = new AtomicInteger(0);
        private final PublisherTransformer producerActionTransformer;
        private volatile boolean removed;

        public ProducerCacheEntry(Producer<?> producer, Supplier<PublisherTransformer> supplier) {
            this.producer.set(producer);
            AtomicReference<Mono<? extends Producer<?>>> atomicReference = this.producerCreator;
            AtomicReference<Producer<?>> atomicReference2 = this.producer;
            Objects.requireNonNull(atomicReference2);
            atomicReference.set(Mono.fromSupplier(atomicReference2::get));
            this.producerActionTransformer = supplier != null ? supplier.get() : PublisherTransformer.identity();
        }

        void activateLease() {
            this.activeLeases.incrementAndGet();
        }

        void releaseLease() {
            if (this.activeLeases.decrementAndGet() == 0 && this.removed) {
                cleanupResources();
            }
        }

        public int getActiveLeases() {
            return this.activeLeases.get();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public <T> Producer<T> getProducer() {
            return this.producer.get();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public <T> Mono<ProducerCacheEntry> recreateIfClosed(Mono<Producer<T>> mono) {
            return Mono.defer(() -> {
                Producer<?> producer = this.producer.get();
                if (producer != null) {
                    if (producer.isConnected()) {
                        return Mono.just(this);
                    }
                    if (this.producerCreator.compareAndSet(this.producerCreator.get(), createCachedProducerMono(mono))) {
                        this.producer.compareAndSet(producer, null);
                        CaffeineReactiveProducerCache.flushAndCloseProducerAsync(producer);
                    }
                }
                return Mono.defer(() -> {
                    return this.producerCreator.get();
                }).filter((v0) -> {
                    return v0.isConnected();
                }).repeatWhenEmpty(5, flux -> {
                    return flux.delayElements(Duration.ofSeconds(1L));
                }).thenReturn(this);
            });
        }

        private <T> Mono<Producer<T>> createCachedProducerMono(Mono<Producer<T>> mono) {
            return mono.doOnNext(producer -> {
                CaffeineReactiveProducerCache.log.info("Replaced closed producer for topic {}", producer.getTopic());
                this.producer.set(producer);
            }).cache();
        }

        void close() {
            this.removed = true;
            if (this.activeLeases.get() == 0) {
                cleanupResources();
            }
        }

        private void cleanupResources() {
            try {
                closeProducer();
            } finally {
                this.producerActionTransformer.dispose();
            }
        }

        private void closeProducer() {
            Producer<?> producer = this.producer.get();
            if (producer == null || !this.producer.compareAndSet(producer, null)) {
                return;
            }
            CaffeineReactiveProducerCache.log.info("Closed producer {} for topic {}", producer.getProducerName(), producer.getTopic());
            CaffeineReactiveProducerCache.flushAndCloseProducerAsync(producer);
        }

        <R> Publisher<? extends R> decorateProducerAction(Flux<R> flux) {
            return this.producerActionTransformer.transform(flux);
        }

        <R> Mono<? extends R> decorateProducerAction(Mono<R> mono) {
            return Mono.from(this.producerActionTransformer.transform(mono));
        }
    }

    public CaffeineReactiveProducerCache() {
        this((Caffeine<Object, Object>) Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(1L)).expireAfterWrite(Duration.ofMinutes(10L)).maximumSize(1000L));
    }

    public CaffeineReactiveProducerCache(CaffeineSpec caffeineSpec) {
        this((Caffeine<Object, Object>) Caffeine.from(caffeineSpec));
    }

    public CaffeineReactiveProducerCache(Caffeine<Object, Object> caffeine) {
        Caffeine scheduler = caffeine.scheduler(Scheduler.systemScheduler());
        reactor.core.scheduler.Scheduler boundedElastic = Schedulers.boundedElastic();
        Objects.requireNonNull(boundedElastic);
        this.cache = scheduler.executor(boundedElastic::schedule).removalListener(this::onRemoval).buildAsync();
    }

    private void onRemoval(ProducerCacheKey producerCacheKey, ProducerCacheEntry producerCacheEntry, RemovalCause removalCause) {
        producerCacheEntry.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void flushAndCloseProducerAsync(Producer<?> producer) {
        producer.flushAsync().thenCompose(r3 -> {
            return producer.closeAsync();
        }).whenComplete((r4, th) -> {
            if (th != null) {
                log.error("Error flushing and closing producer", th);
            }
        });
    }

    private <T> Mono<ProducerCacheEntry> getProducerCacheEntry(ProducerCacheKey producerCacheKey, Mono<Producer<T>> mono, Supplier<PublisherTransformer> supplier) {
        return AdapterImplementationFactory.adaptPulsarFuture(() -> {
            return this.cache.get(producerCacheKey, (producerCacheKey2, executor) -> {
                return mono.map(producer -> {
                    return new ProducerCacheEntry(producer, supplier != null ? supplier : null);
                }).toFuture();
            });
        }).flatMap(producerCacheEntry -> {
            return producerCacheEntry.recreateIfClosed(mono);
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.cache.synchronous().invalidateAll();
    }

    public <T, R> Mono<R> usingCachedProducer(ProducerCacheKey producerCacheKey, Mono<Producer<T>> mono, Supplier<PublisherTransformer> supplier, Function<Producer<T>, Mono<R>> function) {
        return Mono.usingWhen(leaseCacheEntry(producerCacheKey, mono, supplier), producerCacheEntry -> {
            Mono mono2 = (Mono) function.apply(producerCacheEntry.getProducer());
            Objects.requireNonNull(producerCacheEntry);
            return (Mono) mono2.as(producerCacheEntry::decorateProducerAction);
        }, producerCacheEntry2 -> {
            return returnCacheEntry(producerCacheEntry2);
        });
    }

    private Mono<Object> returnCacheEntry(ProducerCacheEntry producerCacheEntry) {
        Objects.requireNonNull(producerCacheEntry);
        return Mono.fromRunnable(producerCacheEntry::releaseLease);
    }

    private <T> Mono<ProducerCacheEntry> leaseCacheEntry(ProducerCacheKey producerCacheKey, Mono<Producer<T>> mono, Supplier<PublisherTransformer> supplier) {
        return getProducerCacheEntry(producerCacheKey, mono, supplier).doOnNext((v0) -> {
            v0.activateLease();
        });
    }

    public <T, R> Flux<R> usingCachedProducerMany(ProducerCacheKey producerCacheKey, Mono<Producer<T>> mono, Supplier<PublisherTransformer> supplier, Function<Producer<T>, Flux<R>> function) {
        return Flux.usingWhen(leaseCacheEntry(producerCacheKey, mono, supplier), producerCacheEntry -> {
            Flux flux = (Flux) function.apply(producerCacheEntry.getProducer());
            Objects.requireNonNull(producerCacheEntry);
            return (Publisher) flux.as(producerCacheEntry::decorateProducerAction);
        }, producerCacheEntry2 -> {
            return returnCacheEntry(producerCacheEntry2);
        });
    }
}
