package akka.persistence.r2dbc.internal;

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorRef$;
import akka.actor.typed.ActorRef$ActorRefOps$;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Extension;
import akka.actor.typed.ExtensionId;
import akka.actor.typed.pubsub.Topic;
import akka.actor.typed.pubsub.Topic$;
import akka.actor.typed.pubsub.Topic$Publish$;
import akka.annotation.InternalApi;
import akka.persistence.FilteredPayload$;
import akka.persistence.Persistence;
import akka.persistence.Persistence$;
import akka.persistence.PersistentRepr;
import akka.persistence.journal.Tagged;
import akka.persistence.journal.Tagged$;
import akka.persistence.query.TimestampOffset;
import akka.persistence.query.TimestampOffset$;
import akka.persistence.query.typed.EventEnvelope;
import akka.persistence.r2dbc.PublishEventsDynamicSettings;
import akka.persistence.typed.PersistenceId$;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple3;
import scala.Tuple3$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Range;
import scala.collection.immutable.Set;
import scala.math.Ordering$Int$;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: PubSub.scala */
@InternalApi
/* loaded from: input_file:akka/persistence/r2dbc/internal/PubSub.class */
public class PubSub implements Extension {
    private final ActorSystem<?> system;
    private final ConcurrentHashMap<String, ActorRef<Object>> topics = new ConcurrentHashMap<>();
    private final Persistence persistenceExt;
    private final IndexedSeq<Range> sliceRanges;
    private final ConcurrentHashMap<Object, Range> sliceRangeLookup;
    private final long throughputCollectIntervalMillis;
    private final double throughputThreshold;
    private final int throughputSampler;
    private final AtomicLong throughputCounter;
    private volatile EWMA throughput;

    public static Extension apply(ActorSystem actorSystem) {
        return PubSub$.MODULE$.apply(actorSystem);
    }

    public static PubSub createExtension(ActorSystem<?> actorSystem) {
        return PubSub$.MODULE$.createExtension(actorSystem);
    }

    public static PubSub get(ActorSystem<?> actorSystem) {
        return PubSub$.MODULE$.get(actorSystem);
    }

    public static ExtensionId<PubSub> id() {
        return PubSub$.MODULE$.id();
    }

    public PubSub(ActorSystem<?> actorSystem) {
        this.system = actorSystem;
        this.persistenceExt = Persistence$.MODULE$.apply(actorSystem);
        PublishEventsDynamicSettings publishEventsDynamicSettings = new PublishEventsDynamicSettings(actorSystem.settings().config().getConfig("akka.persistence.r2dbc.journal.publish-events-dynamic"));
        this.sliceRanges = this.persistenceExt.sliceRanges(actorSystem.settings().config().getInt("akka.persistence.r2dbc.journal.publish-events-number-of-topics"));
        this.sliceRangeLookup = new ConcurrentHashMap<>();
        this.throughputCollectIntervalMillis = publishEventsDynamicSettings.throughputCollectInterval().toMillis();
        this.throughputThreshold = publishEventsDynamicSettings.throughputThreshold();
        this.throughputSampler = package$.MODULE$.min(1000, package$.MODULE$.max(1, publishEventsDynamicSettings.throughputThreshold() / 10));
        this.throughputCounter = new AtomicLong();
        this.throughput = EWMA$.MODULE$.apply(0.0d, EWMA$.MODULE$.alpha(publishEventsDynamicSettings.throughputCollectInterval().$times(2L), publishEventsDynamicSettings.throughputCollectInterval()));
    }

    /* renamed from: eventTopic, reason: merged with bridge method [inline-methods] */
    public <Event> ActorRef<Topic.Command<EventEnvelope<Event>>> eventTopics$$anonfun$1(String str, int i) {
        String str2 = topicName(str, i);
        return this.topics.computeIfAbsent(str2, str3 -> {
            return this.system.systemActorOf(Topic$.MODULE$.apply(str2, ClassTag$.MODULE$.apply(EventEnvelope.class)), str2, this.system.systemActorOf$default$3()).unsafeUpcast();
        }).narrow();
    }

    public <Event> Set<ActorRef<Topic.Command<EventEnvelope<Event>>>> eventTopics(String str, int i, int i2) {
        return RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(i), i2).map(obj -> {
            return eventTopics$$anonfun$1(str, BoxesRunTime.unboxToInt(obj));
        }).toSet();
    }

    private String topicName(String str, int i) {
        Range computeIfAbsent = this.sliceRangeLookup.computeIfAbsent(BoxesRunTime.boxToInteger(i), obj -> {
            return $anonfun$1(i, BoxesRunTime.unboxToInt(obj));
        });
        return URLEncoder.encode(new StringBuilder(8).append("r2dbc-").append(str).append("-").append(computeIfAbsent.min(Ordering$Int$.MODULE$)).append("-").append(computeIfAbsent.max(Ordering$Int$.MODULE$)).toString(), StandardCharsets.UTF_8.name());
    }

    public void publish(PersistentRepr persistentRepr, Instant instant) {
        Tuple3 apply;
        long incrementAndGet = this.throughputCounter.incrementAndGet();
        if (incrementAndGet % this.throughputSampler == 0) {
            EWMA ewma = this.throughput;
            long nanoTime = ((System.nanoTime() - ewma.nanoTime()) / 1000) / 1000;
            if (nanoTime >= this.throughputCollectIntervalMillis) {
                this.throughputCounter.set(0L);
                EWMA $colon$plus = ewma.$colon$plus((incrementAndGet * 1000.0d) / nanoTime);
                this.throughput = $colon$plus;
                if (ewma.value() < this.throughputThreshold && $colon$plus.value() >= this.throughputThreshold) {
                    PubSub$.akka$persistence$r2dbc$internal$PubSub$$$log.info("Disabled publishing of events. Throughput greater than [{}] events/s", BoxesRunTime.boxToDouble(this.throughputThreshold));
                } else if (ewma.value() < this.throughputThreshold || $colon$plus.value() >= this.throughputThreshold) {
                    PubSub$.akka$persistence$r2dbc$internal$PubSub$$$log.debug("Publishing of events is {}. Throughput is [{}] events/s", $colon$plus.value() < this.throughputThreshold ? "enabled" : "disabled", BoxesRunTime.boxToDouble($colon$plus.value()));
                } else {
                    PubSub$.akka$persistence$r2dbc$internal$PubSub$$$log.info("Enabled publishing of events. Throughput less than [{}] events/s", BoxesRunTime.boxToDouble(this.throughputThreshold));
                }
            }
        }
        if (this.throughput.value() < this.throughputThreshold) {
            String persistenceId = persistentRepr.persistenceId();
            String extractEntityType = PersistenceId$.MODULE$.extractEntityType(persistenceId);
            int sliceForPersistenceId = this.persistenceExt.sliceForPersistenceId(persistenceId);
            TimestampOffset apply2 = TimestampOffset$.MODULE$.apply(instant, instant, (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc(persistenceId), BoxesRunTime.boxToLong(persistentRepr.sequenceNr()))})));
            Object payload = persistentRepr.payload();
            if (payload instanceof Tagged) {
                Tagged unapply = Tagged$.MODULE$.unapply((Tagged) payload);
                apply = Tuple3$.MODULE$.apply(Some$.MODULE$.apply(unapply._1()), unapply._2(), BoxesRunTime.boxToBoolean(false));
            } else {
                apply = FilteredPayload$.MODULE$.equals(payload) ? Tuple3$.MODULE$.apply(None$.MODULE$, Predef$.MODULE$.Set().empty(), BoxesRunTime.boxToBoolean(true)) : Tuple3$.MODULE$.apply(Some$.MODULE$.apply(payload), Predef$.MODULE$.Set().empty(), BoxesRunTime.boxToBoolean(false));
            }
            Tuple3 tuple3 = apply;
            ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(eventTopics$$anonfun$1(extractEntityType, sliceForPersistenceId)), Topic$Publish$.MODULE$.apply(new EventEnvelope(apply2, persistenceId, persistentRepr.sequenceNr(), (Option) tuple3._1(), instant.toEpochMilli(), persistentRepr.metadata(), extractEntityType, sliceForPersistenceId, BoxesRunTime.unboxToBoolean(tuple3._3()), EnvelopeOrigin$.MODULE$.SourcePubSub(), (Set) tuple3._2())));
        }
    }

    private final Range $anonfun$1$$anonfun$2(int i) {
        throw new IllegalArgumentException(new StringBuilder(22).append("Slice [").append(i).append("] not found in ").append(new StringBuilder(15).append("slice ranges [").append(this.sliceRanges.mkString(", ")).append("]").toString()).toString());
    }

    private final /* synthetic */ Range $anonfun$1(int i, int i2) {
        return (Range) this.sliceRanges.find(range -> {
            return range.contains(i);
        }).getOrElse(() -> {
            return r1.$anonfun$1$$anonfun$2(r2);
        });
    }
}
