package akka.persistence.r2dbc.journal;

import akka.Done;
import akka.Done$;
import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.SupervisorStrategy;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.scaladsl.adapter.package$;
import akka.actor.typed.scaladsl.adapter.package$ClassicActorSystemOps$;
import akka.annotation.InternalApi;
import akka.dispatch.ExecutionContexts$;
import akka.event.LogSource$;
import akka.event.Logging$;
import akka.event.LoggingAdapter;
import akka.pattern.CircuitBreaker;
import akka.persistence.AtomicWrite;
import akka.persistence.AtomicWrite$;
import akka.persistence.Persistence;
import akka.persistence.Persistence$;
import akka.persistence.PersistentRepr;
import akka.persistence.journal.AsyncWriteJournal;
import akka.persistence.journal.EventAdapters;
import akka.persistence.journal.ReplayFilter;
import akka.persistence.journal.Tagged;
import akka.persistence.journal.Tagged$;
import akka.persistence.journal.WriteJournalBase;
import akka.persistence.query.PersistenceQuery$;
import akka.persistence.r2dbc.ConnectionFactoryProvider;
import akka.persistence.r2dbc.ConnectionFactoryProvider$;
import akka.persistence.r2dbc.R2dbcSettings;
import akka.persistence.r2dbc.R2dbcSettings$;
import akka.persistence.r2dbc.internal.InstantFactory$;
import akka.persistence.r2dbc.internal.PubSub;
import akka.persistence.r2dbc.internal.PubSub$;
import akka.persistence.r2dbc.journal.JournalDao;
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal;
import akka.persistence.typed.PersistenceId$;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension$;
import akka.serialization.Serializer;
import akka.serialization.Serializers$;
import akka.stream.Materializer$;
import akka.stream.scaladsl.Sink$;
import com.typesafe.config.Config;
import java.io.Serializable;
import java.time.Instant;
import java.util.HashMap;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef$;
import scala.Product;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.Iterator;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: R2dbcJournal.scala */
@InternalApi
/* loaded from: input_file:akka/persistence/r2dbc/journal/R2dbcJournal.class */
public final class R2dbcJournal implements Actor, WriteJournalBase, AsyncWriteJournal {
    private ActorContext context;
    private ActorRef self;
    private Persistence persistence;
    private EventAdapters akka$persistence$journal$WriteJournalBase$$eventAdapters;
    private Persistence akka$persistence$journal$AsyncWriteJournal$$extension;
    private boolean akka$persistence$journal$AsyncWriteJournal$$publish;
    private Config akka$persistence$journal$AsyncWriteJournal$$config;
    private CircuitBreaker akka$persistence$journal$AsyncWriteJournal$$breaker;
    private ReplayFilter.Mode akka$persistence$journal$AsyncWriteJournal$$replayFilterMode;
    private int akka$persistence$journal$AsyncWriteJournal$$replayFilterWindowSize;
    private int akka$persistence$journal$AsyncWriteJournal$$replayFilterMaxOldWriters;
    private ActorRef akka$persistence$journal$AsyncWriteJournal$$resequencer;
    private long akka$persistence$journal$AsyncWriteJournal$$resequencerCounter;
    private PartialFunction receiveWriteJournal;
    private final ActorSystem system;
    private final ExecutionContext ec;
    private final LoggingAdapter log;
    private final Persistence persistenceExt;
    private final Serialization serialization;
    private final R2dbcSettings journalSettings;
    private final JournalDao journalDao;
    private final R2dbcReadJournal query;
    private final Option<PubSub> pubSub;
    public final HashMap<String, Future<?>> akka$persistence$r2dbc$journal$R2dbcJournal$$writesInProgress;

    /* compiled from: R2dbcJournal.scala */
    /* loaded from: input_file:akka/persistence/r2dbc/journal/R2dbcJournal$WriteFinished.class */
    public static class WriteFinished implements Product, Serializable {
        private final String persistenceId;
        private final Future done;

        public static WriteFinished apply(String str, Future<?> future) {
            return R2dbcJournal$WriteFinished$.MODULE$.apply(str, future);
        }

        public static WriteFinished fromProduct(Product product) {
            return R2dbcJournal$WriteFinished$.MODULE$.m42fromProduct(product);
        }

        public static WriteFinished unapply(WriteFinished writeFinished) {
            return R2dbcJournal$WriteFinished$.MODULE$.unapply(writeFinished);
        }

        public WriteFinished(String str, Future<?> future) {
            this.persistenceId = str;
            this.done = future;
        }

        public /* bridge */ /* synthetic */ Iterator productIterator() {
            return Product.productIterator$(this);
        }

        public /* bridge */ /* synthetic */ Iterator productElementNames() {
            return Product.productElementNames$(this);
        }

        public int hashCode() {
            return ScalaRunTime$.MODULE$._hashCode(this);
        }

        public boolean equals(Object obj) {
            boolean z;
            if (this != obj) {
                if (obj instanceof WriteFinished) {
                    WriteFinished writeFinished = (WriteFinished) obj;
                    String persistenceId = persistenceId();
                    String persistenceId2 = writeFinished.persistenceId();
                    if (persistenceId != null ? persistenceId.equals(persistenceId2) : persistenceId2 == null) {
                        Future<?> done = done();
                        Future<?> done2 = writeFinished.done();
                        if (done != null ? done.equals(done2) : done2 == null) {
                            if (writeFinished.canEqual(this)) {
                                z = true;
                            }
                        }
                    }
                    z = false;
                } else {
                    z = false;
                }
                if (!z) {
                    return false;
                }
            }
            return true;
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString(this);
        }

        public boolean canEqual(Object obj) {
            return obj instanceof WriteFinished;
        }

        public int productArity() {
            return 2;
        }

        public String productPrefix() {
            return "WriteFinished";
        }

        public Object productElement(int i) {
            if (0 == i) {
                return _1();
            }
            if (1 == i) {
                return _2();
            }
            throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
        }

        public String productElementName(int i) {
            if (0 == i) {
                return "persistenceId";
            }
            if (1 == i) {
                return "done";
            }
            throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
        }

        public String persistenceId() {
            return this.persistenceId;
        }

        public Future<?> done() {
            return this.done;
        }

        public WriteFinished copy(String str, Future<?> future) {
            return new WriteFinished(str, future);
        }

        public String copy$default$1() {
            return persistenceId();
        }

        public Future<?> copy$default$2() {
            return done();
        }

        public String _1() {
            return persistenceId();
        }

        public Future<?> _2() {
            return done();
        }
    }

    public static PersistentRepr deserializeRow(Serialization serialization, JournalDao.SerializedJournalRow serializedJournalRow) {
        return R2dbcJournal$.MODULE$.deserializeRow(serialization, serializedJournalRow);
    }

    public R2dbcJournal(Config config, String str) {
        Actor.$init$(this);
        WriteJournalBase.$init$(this);
        AsyncWriteJournal.$init$(this);
        this.system = package$ClassicActorSystemOps$.MODULE$.toTyped$extension(package$.MODULE$.ClassicActorSystemOps(context().system()));
        this.ec = context().dispatcher();
        this.log = Logging$.MODULE$.apply(context().system(), R2dbcJournal.class, LogSource$.MODULE$.fromAnyClass());
        this.persistenceExt = Persistence$.MODULE$.apply(system());
        String replaceAll = str.replaceAll("\\.journal$", "");
        this.serialization = SerializationExtension$.MODULE$.apply(context().system());
        this.journalSettings = R2dbcSettings$.MODULE$.apply(context().system().settings().config().getConfig(replaceAll));
        this.journalDao = new JournalDao(this.journalSettings, ((ConnectionFactoryProvider) ConnectionFactoryProvider$.MODULE$.apply(system())).connectionFactoryFor(new StringBuilder(19).append(replaceAll).append(".connection-factory").toString()), ec(), system());
        this.query = PersistenceQuery$.MODULE$.apply(system()).readJournalFor(new StringBuilder(6).append(replaceAll).append(".query").toString());
        this.pubSub = this.journalSettings.journalPublishEvents() ? Some$.MODULE$.apply(PubSub$.MODULE$.apply(system())) : None$.MODULE$;
        this.akka$persistence$r2dbc$journal$R2dbcJournal$$writesInProgress = new HashMap<>();
        Statics.releaseFence();
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public /* bridge */ /* synthetic */ ActorRef sender() {
        return Actor.sender$(this);
    }

    @InternalApi
    public /* bridge */ /* synthetic */ void aroundReceive(PartialFunction partialFunction, Object obj) {
        Actor.aroundReceive$(this, partialFunction, obj);
    }

    @InternalApi
    public /* bridge */ /* synthetic */ void aroundPreStart() {
        Actor.aroundPreStart$(this);
    }

    @InternalApi
    public /* bridge */ /* synthetic */ void aroundPostStop() {
        Actor.aroundPostStop$(this);
    }

    @InternalApi
    public /* bridge */ /* synthetic */ void aroundPreRestart(Throwable th, Option option) {
        Actor.aroundPreRestart$(this, th, option);
    }

    @InternalApi
    public /* bridge */ /* synthetic */ void aroundPostRestart(Throwable th) {
        Actor.aroundPostRestart$(this, th);
    }

    public /* bridge */ /* synthetic */ SupervisorStrategy supervisorStrategy() {
        return Actor.supervisorStrategy$(this);
    }

    public /* bridge */ /* synthetic */ void preStart() throws Exception {
        Actor.preStart$(this);
    }

    public /* bridge */ /* synthetic */ void postStop() throws Exception {
        Actor.postStop$(this);
    }

    public /* bridge */ /* synthetic */ void preRestart(Throwable th, Option option) throws Exception {
        Actor.preRestart$(this, th, option);
    }

    public /* bridge */ /* synthetic */ void postRestart(Throwable th) throws Exception {
        Actor.postRestart$(this, th);
    }

    public /* bridge */ /* synthetic */ void unhandled(Object obj) {
        Actor.unhandled$(this, obj);
    }

    public Persistence persistence() {
        return this.persistence;
    }

    public EventAdapters akka$persistence$journal$WriteJournalBase$$eventAdapters() {
        return this.akka$persistence$journal$WriteJournalBase$$eventAdapters;
    }

    public void akka$persistence$journal$WriteJournalBase$_setter_$persistence_$eq(Persistence persistence) {
        this.persistence = persistence;
    }

    public void akka$persistence$journal$WriteJournalBase$_setter_$akka$persistence$journal$WriteJournalBase$$eventAdapters_$eq(EventAdapters eventAdapters) {
        this.akka$persistence$journal$WriteJournalBase$$eventAdapters = eventAdapters;
    }

    public /* bridge */ /* synthetic */ Seq preparePersistentBatch(Seq seq) {
        return WriteJournalBase.preparePersistentBatch$(this, seq);
    }

    public /* bridge */ /* synthetic */ Seq adaptFromJournal(PersistentRepr persistentRepr) {
        return WriteJournalBase.adaptFromJournal$(this, persistentRepr);
    }

    public /* bridge */ /* synthetic */ PersistentRepr adaptToJournal(PersistentRepr persistentRepr) {
        return WriteJournalBase.adaptToJournal$(this, persistentRepr);
    }

    public Persistence akka$persistence$journal$AsyncWriteJournal$$extension() {
        return this.akka$persistence$journal$AsyncWriteJournal$$extension;
    }

    public boolean akka$persistence$journal$AsyncWriteJournal$$publish() {
        return this.akka$persistence$journal$AsyncWriteJournal$$publish;
    }

    public Config akka$persistence$journal$AsyncWriteJournal$$config() {
        return this.akka$persistence$journal$AsyncWriteJournal$$config;
    }

    public CircuitBreaker akka$persistence$journal$AsyncWriteJournal$$breaker() {
        return this.akka$persistence$journal$AsyncWriteJournal$$breaker;
    }

    public ReplayFilter.Mode akka$persistence$journal$AsyncWriteJournal$$replayFilterMode() {
        return this.akka$persistence$journal$AsyncWriteJournal$$replayFilterMode;
    }

    public int akka$persistence$journal$AsyncWriteJournal$$replayFilterWindowSize() {
        return this.akka$persistence$journal$AsyncWriteJournal$$replayFilterWindowSize;
    }

    public int akka$persistence$journal$AsyncWriteJournal$$replayFilterMaxOldWriters() {
        return this.akka$persistence$journal$AsyncWriteJournal$$replayFilterMaxOldWriters;
    }

    public ActorRef akka$persistence$journal$AsyncWriteJournal$$resequencer() {
        return this.akka$persistence$journal$AsyncWriteJournal$$resequencer;
    }

    public long akka$persistence$journal$AsyncWriteJournal$$resequencerCounter() {
        return this.akka$persistence$journal$AsyncWriteJournal$$resequencerCounter;
    }

    public final PartialFunction receiveWriteJournal() {
        return this.receiveWriteJournal;
    }

    public void akka$persistence$journal$AsyncWriteJournal$$resequencerCounter_$eq(long j) {
        this.akka$persistence$journal$AsyncWriteJournal$$resequencerCounter = j;
    }

    public void akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$extension_$eq(Persistence persistence) {
        this.akka$persistence$journal$AsyncWriteJournal$$extension = persistence;
    }

    public void akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$publish_$eq(boolean z) {
        this.akka$persistence$journal$AsyncWriteJournal$$publish = z;
    }

    public void akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$config_$eq(Config config) {
        this.akka$persistence$journal$AsyncWriteJournal$$config = config;
    }

    public void akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$breaker_$eq(CircuitBreaker circuitBreaker) {
        this.akka$persistence$journal$AsyncWriteJournal$$breaker = circuitBreaker;
    }

    public void akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$replayFilterMode_$eq(ReplayFilter.Mode mode) {
        this.akka$persistence$journal$AsyncWriteJournal$$replayFilterMode = mode;
    }

    public void akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$replayFilterWindowSize_$eq(int i) {
        this.akka$persistence$journal$AsyncWriteJournal$$replayFilterWindowSize = i;
    }

    public void akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$replayFilterMaxOldWriters_$eq(int i) {
        this.akka$persistence$journal$AsyncWriteJournal$$replayFilterMaxOldWriters = i;
    }

    public void akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$resequencer_$eq(ActorRef actorRef) {
        this.akka$persistence$journal$AsyncWriteJournal$$resequencer = actorRef;
    }

    public void akka$persistence$journal$AsyncWriteJournal$_setter_$receiveWriteJournal_$eq(PartialFunction partialFunction) {
        this.receiveWriteJournal = partialFunction;
    }

    public /* bridge */ /* synthetic */ PartialFunction receive() {
        return AsyncWriteJournal.receive$(this);
    }

    public ActorSystem<?> system() {
        return this.system;
    }

    public ExecutionContext ec() {
        return this.ec;
    }

    public PartialFunction<Object, BoxedUnit> receivePluginInternal() {
        return new R2dbcJournal$$anon$1(this);
    }

    public Future<Seq<Try<BoxedUnit>>> asyncWriteMessages(Seq<AtomicWrite> seq) {
        String persistenceId = ((AtomicWrite) seq.head()).persistenceId();
        Future<?> publish = publish(seq, seq.size() == 1 ? atomicWrite$2((AtomicWrite) seq.head()) : atomicWrite$2(AtomicWrite$.MODULE$.apply((Seq) seq.flatMap(atomicWrite -> {
            return atomicWrite.payload();
        }))));
        this.akka$persistence$r2dbc$journal$R2dbcJournal$$writesInProgress.put(persistenceId, publish);
        publish.onComplete(r8 -> {
            self().$bang(R2dbcJournal$WriteFinished$.MODULE$.apply(persistenceId, publish), self());
        }, ec());
        return publish.map(done -> {
            return scala.package$.MODULE$.Nil();
        }, ExecutionContexts$.MODULE$.parasitic());
    }

    private Future<Done> publish(Seq<AtomicWrite> seq, Future<Instant> future) {
        Some some = this.pubSub;
        if (some instanceof Some) {
            PubSub pubSub = (PubSub) some.value();
            return future.map(instant -> {
                seq.iterator().flatMap(atomicWrite -> {
                    return atomicWrite.payload().iterator();
                }).foreach(persistentRepr -> {
                    pubSub.publish(persistentRepr, instant);
                });
                return Done$.MODULE$;
            }, ec());
        }
        if (None$.MODULE$.equals(some)) {
            return future.map(instant2 -> {
                return Done$.MODULE$;
            }, ExecutionContexts$.MODULE$.parasitic());
        }
        throw new MatchError(some);
    }

    public Future<BoxedUnit> asyncDeleteMessagesTo(String str, long j) {
        this.log.debug("asyncDeleteMessagesTo persistenceId [{}], toSequenceNr [{}]", str, BoxesRunTime.boxToLong(j));
        return this.journalDao.deleteEventsTo(str, j, false);
    }

    public Future<BoxedUnit> asyncReplayMessages(String str, long j, long j2, long j3, Function1<PersistentRepr, BoxedUnit> function1) {
        this.log.debug("asyncReplayMessages persistenceId [{}], fromSequenceNr [{}]", str, BoxesRunTime.boxToLong(j));
        return ((Future) this.query.internalCurrentEventsByPersistenceId(str, j, j3 == Long.MAX_VALUE ? j2 : scala.math.package$.MODULE$.min(j2, (j + j3) - 1)).runWith(Sink$.MODULE$.foreach(serializedJournalRow -> {
            function1.apply(R2dbcJournal$.MODULE$.deserializeRow(this.serialization, serializedJournalRow));
        }), Materializer$.MODULE$.matFromSystem(system()))).map(done -> {
        }, ec());
    }

    public Future<Object> asyncReadHighestSequenceNr(String str, long j) {
        Future successful;
        this.log.debug("asyncReadHighestSequenceNr [{}] [{}]", str, BoxesRunTime.boxToLong(j));
        Some apply = Option$.MODULE$.apply(this.akka$persistence$r2dbc$journal$R2dbcJournal$$writesInProgress.get(str));
        if (apply instanceof Some) {
            Future future = (Future) apply.value();
            this.log.debug("Write in progress for [{}], deferring highest seq nr until write completed", str);
            successful = future.recover(new R2dbcJournal$$anon$2(), ExecutionContexts$.MODULE$.parasitic());
        } else {
            if (!None$.MODULE$.equals(apply)) {
                throw new MatchError(apply);
            }
            successful = Future$.MODULE$.successful(Done$.MODULE$);
        }
        return successful.flatMap(obj -> {
            return this.journalDao.readHighestSequenceNr(str, j);
        }, ec());
    }

    private final Seq $anonfun$1(AtomicWrite atomicWrite, Instant instant) {
        return (Seq) atomicWrite.payload().map(persistentRepr -> {
            Tuple2 apply;
            Object payload = persistentRepr.payload();
            if (payload instanceof Tagged) {
                Tagged unapply = Tagged$.MODULE$.unapply((Tagged) payload);
                apply = Tuple2$.MODULE$.apply(unapply._1(), unapply._2());
            } else {
                apply = Tuple2$.MODULE$.apply(payload, Predef$.MODULE$.Set().empty());
            }
            Tuple2 tuple2 = apply;
            Object _1 = tuple2._1();
            Set<String> set = (Set) tuple2._2();
            String extractEntityType = PersistenceId$.MODULE$.extractEntityType(persistentRepr.persistenceId());
            int sliceForPersistenceId = this.persistenceExt.sliceForPersistenceId(persistentRepr.persistenceId());
            byte[] bArr = (byte[]) this.serialization.serialize(_1).get();
            Serializer findSerializerFor = this.serialization.findSerializerFor(_1);
            String manifestFor = Serializers$.MODULE$.manifestFor(findSerializerFor, _1);
            return JournalDao$SerializedJournalRow$.MODULE$.apply(sliceForPersistenceId, extractEntityType, persistentRepr.persistenceId(), persistentRepr.sequenceNr(), instant, JournalDao$.MODULE$.EmptyDbTimestamp(), Some$.MODULE$.apply(bArr), findSerializerFor.identifier(), manifestFor, persistentRepr.writerUuid(), set, persistentRepr.metadata().map(obj -> {
                byte[] bArr2 = (byte[]) this.serialization.serialize(obj).get();
                Serializer findSerializerFor2 = this.serialization.findSerializerFor(obj);
                String manifestFor2 = Serializers$.MODULE$.manifestFor(findSerializerFor2, obj);
                return JournalDao$SerializedEventMetadata$.MODULE$.apply(findSerializerFor2.identifier(), manifestFor2, bArr2);
            }));
        });
    }

    private final Future atomicWrite$2(AtomicWrite atomicWrite) {
        Instant now = this.journalSettings.useAppTimestamp() ? InstantFactory$.MODULE$.now() : JournalDao$.MODULE$.EmptyDbTimestamp();
        Success apply = Try$.MODULE$.apply(() -> {
            return r1.$anonfun$1(r2, r3);
        });
        if (apply instanceof Success) {
            return this.journalDao.writeEvents((Seq) apply.value());
        }
        if (apply instanceof Failure) {
            return Future$.MODULE$.failed(((Failure) apply).exception());
        }
        throw new MatchError(apply);
    }
}
