/*
 * Decompiled with CFR 0.152.
 */
package com.github.lhotari.reactive.pulsar.producercache;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.CaffeineSpec;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.lhotari.reactive.pulsar.internal.adapter.AdapterImplementationFactory;
import com.github.lhotari.reactive.pulsar.resourceadapter.ProducerCacheKey;
import com.github.lhotari.reactive.pulsar.resourceadapter.PublisherTransformer;
import com.github.lhotari.reactive.pulsar.resourceadapter.ReactiveProducerCache;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Producer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class CaffeineReactiveProducerCache
implements ReactiveProducerCache,
AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(CaffeineReactiveProducerCache.class);
    final AsyncCache<ProducerCacheKey, ProducerCacheEntry> cache;

    public CaffeineReactiveProducerCache() {
        this((Caffeine<Object, Object>)Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(1L)).expireAfterWrite(Duration.ofMinutes(10L)).maximumSize(1000L));
    }

    public CaffeineReactiveProducerCache(CaffeineSpec caffeineSpec) {
        this((Caffeine<Object, Object>)Caffeine.from((CaffeineSpec)caffeineSpec));
    }

    public CaffeineReactiveProducerCache(Caffeine<Object, Object> caffeineBuilder) {
        this.cache = caffeineBuilder.scheduler(Scheduler.systemScheduler()).executor(arg_0 -> ((reactor.core.scheduler.Scheduler)Schedulers.boundedElastic()).schedule(arg_0)).removalListener(this::onRemoval).buildAsync();
    }

    private void onRemoval(ProducerCacheKey key, ProducerCacheEntry entry, RemovalCause cause) {
        entry.close();
    }

    private static void flushAndCloseProducerAsync(Producer<?> producer) {
        ((CompletableFuture)producer.flushAsync().thenCompose(__ -> producer.closeAsync())).whenComplete((r, t) -> {
            if (t != null) {
                log.error("Error flushing and closing producer", t);
            }
        });
    }

    private <T> Mono<ProducerCacheEntry> getProducerCacheEntry(ProducerCacheKey cacheKey, Mono<Producer<T>> producerMono, Supplier<PublisherTransformer> producerActionTransformer) {
        return AdapterImplementationFactory.adaptPulsarFuture(() -> this.cache.get((Object)cacheKey, (arg_0, arg_1) -> CaffeineReactiveProducerCache.lambda$getProducerCacheEntry$3(producerMono, (Supplier)producerActionTransformer, arg_0, arg_1))).flatMap(producerCacheEntry -> producerCacheEntry.recreateIfClosed(producerMono));
    }

    @Override
    public void close() {
        this.cache.synchronous().invalidateAll();
    }

    public <T, R> Mono<R> usingCachedProducer(ProducerCacheKey cacheKey, Mono<Producer<T>> producerMono, Supplier<PublisherTransformer> producerActionTransformer, Function<Producer<T>, Mono<R>> usingProducerAction) {
        return Mono.usingWhen(this.leaseCacheEntry(cacheKey, producerMono, producerActionTransformer), producerCacheEntry -> (Mono)((Mono)usingProducerAction.apply(producerCacheEntry.getProducer())).as(producerCacheEntry::decorateProducerAction), producerCacheEntry -> this.returnCacheEntry((ProducerCacheEntry)producerCacheEntry));
    }

    private Mono<Object> returnCacheEntry(ProducerCacheEntry producerCacheEntry) {
        return Mono.fromRunnable(producerCacheEntry::releaseLease);
    }

    private <T> Mono<ProducerCacheEntry> leaseCacheEntry(ProducerCacheKey cacheKey, Mono<Producer<T>> producerMono, Supplier<PublisherTransformer> producerActionTransformer) {
        return this.getProducerCacheEntry(cacheKey, producerMono, producerActionTransformer).doOnNext(ProducerCacheEntry::activateLease);
    }

    public <T, R> Flux<R> usingCachedProducerMany(ProducerCacheKey cacheKey, Mono<Producer<T>> producerMono, Supplier<PublisherTransformer> producerActionTransformer, Function<Producer<T>, Flux<R>> usingProducerAction) {
        return Flux.usingWhen(this.leaseCacheEntry(cacheKey, producerMono, producerActionTransformer), producerCacheEntry -> (Publisher)((Flux)usingProducerAction.apply(producerCacheEntry.getProducer())).as(producerCacheEntry::decorateProducerAction), producerCacheEntry -> this.returnCacheEntry((ProducerCacheEntry)producerCacheEntry));
    }

    private static /* synthetic */ CompletableFuture lambda$getProducerCacheEntry$3(Mono producerMono, Supplier producerActionTransformer, ProducerCacheKey __, Executor ___) {
        return producerMono.map(arg_0 -> CaffeineReactiveProducerCache.lambda$getProducerCacheEntry$2((Supplier)producerActionTransformer, arg_0)).toFuture();
    }

    private static /* synthetic */ ProducerCacheEntry lambda$getProducerCacheEntry$2(Supplier producerActionTransformer, Producer producer) {
        return new ProducerCacheEntry(producer, producerActionTransformer != null ? producerActionTransformer : null);
    }

    static class ProducerCacheEntry {
        private final AtomicReference<Producer<?>> producer = new AtomicReference();
        private final AtomicReference<Mono<? extends Producer<?>>> producerCreator = new AtomicReference();
        private final AtomicInteger activeLeases = new AtomicInteger(0);
        private final PublisherTransformer producerActionTransformer;
        private volatile boolean removed;

        public ProducerCacheEntry(Producer<?> producer, Supplier<PublisherTransformer> producerActionTransformer) {
            this.producer.set(producer);
            this.producerCreator.set(Mono.fromSupplier(this.producer::get));
            this.producerActionTransformer = producerActionTransformer != null ? producerActionTransformer.get() : PublisherTransformer.identity();
        }

        void activateLease() {
            this.activeLeases.incrementAndGet();
        }

        void releaseLease() {
            int currentLeases = this.activeLeases.decrementAndGet();
            if (currentLeases == 0 && this.removed) {
                this.closeProducer();
            }
        }

        public int getActiveLeases() {
            return this.activeLeases.get();
        }

        public <T> Producer<T> getProducer() {
            return this.producer.get();
        }

        <T> Mono<ProducerCacheEntry> recreateIfClosed(Mono<Producer<T>> producerMono) {
            return Mono.defer(() -> {
                Producer<?> p = this.producer.get();
                if (p != null) {
                    if (p.isConnected()) {
                        return Mono.just((Object)this);
                    }
                    Mono<? extends Producer<?>> previousUpdater = this.producerCreator.get();
                    if (this.producerCreator.compareAndSet(previousUpdater, this.createCachedProducerMono(producerMono))) {
                        this.producer.compareAndSet(p, null);
                        CaffeineReactiveProducerCache.flushAndCloseProducerAsync(p);
                    }
                }
                return Mono.defer(() -> this.producerCreator.get()).filter(Producer::isConnected).repeatWhenEmpty(5, flux -> flux.delayElements(Duration.ofSeconds(1L))).thenReturn((Object)this);
            });
        }

        private <T> Mono<Producer<T>> createCachedProducerMono(Mono<Producer<T>> producerMono) {
            return producerMono.doOnNext(newProducer -> {
                log.info("Replaced closed producer for topic {}", (Object)newProducer.getTopic());
                this.producer.set((Producer<?>)newProducer);
            }).cache();
        }

        void close() {
            this.removed = true;
            if (this.activeLeases.get() == 0) {
                this.closeProducer();
            }
            this.producerActionTransformer.dispose();
        }

        private void closeProducer() {
            Producer<?> p = this.producer.get();
            if (p != null && this.producer.compareAndSet(p, null)) {
                log.info("Closed producer {} for topic {}", (Object)p.getProducerName(), (Object)p.getTopic());
                CaffeineReactiveProducerCache.flushAndCloseProducerAsync(p);
            }
        }

        <R> Publisher<? extends R> decorateProducerAction(Flux<R> source) {
            return this.producerActionTransformer.transform(source);
        }

        <R> Mono<? extends R> decorateProducerAction(Mono<R> source) {
            return Mono.from((Publisher)this.producerActionTransformer.transform(source));
        }
    }
}

