package akka.projection.grpc.internal;

import akka.Done;
import akka.NotUsed;
import akka.NotUsed$;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.scaladsl.package$LoggerOps$;
import akka.annotation.InternalApi;
import akka.grpc.GrpcServiceException;
import akka.grpc.scaladsl.Metadata;
import akka.persistence.query.TimestampOffset;
import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdStartingFromSnapshotQuery;
import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery;
import akka.persistence.query.typed.scaladsl.EventTimestampQuery;
import akka.persistence.query.typed.scaladsl.EventsBySliceQuery;
import akka.persistence.query.typed.scaladsl.EventsBySliceStartingFromSnapshotsQuery;
import akka.persistence.query.typed.scaladsl.LoadEventQuery;
import akka.persistence.typed.PersistenceId$;
import akka.projection.grpc.internal.proto.Event;
import akka.projection.grpc.internal.proto.EventProducerServicePowerApi;
import akka.projection.grpc.internal.proto.EventTimestampRequest;
import akka.projection.grpc.internal.proto.EventTimestampResponse;
import akka.projection.grpc.internal.proto.EventTimestampResponse$;
import akka.projection.grpc.internal.proto.FilteredEvent;
import akka.projection.grpc.internal.proto.FilteredEvent$;
import akka.projection.grpc.internal.proto.InitReq;
import akka.projection.grpc.internal.proto.LoadEventRequest;
import akka.projection.grpc.internal.proto.LoadEventResponse;
import akka.projection.grpc.internal.proto.LoadEventResponse$;
import akka.projection.grpc.internal.proto.StreamIn;
import akka.projection.grpc.internal.proto.StreamOut;
import akka.projection.grpc.internal.proto.StreamOut$;
import akka.projection.grpc.producer.scaladsl.EventProducer;
import akka.projection.grpc.producer.scaladsl.EventProducerInterceptor;
import akka.stream.scaladsl.BidiFlow$;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import com.google.protobuf.timestamp.Timestamp$;
import io.grpc.Status;
import java.time.Instant;
import org.slf4j.Logger;
import scala.$less$colon$less$;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.SeqFactory;
import scala.collection.SeqFactory$UnapplySeqWrapper$;
import scala.collection.SeqOps;
import scala.collection.StringOps$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: EventProducerServiceImpl.scala */
@ScalaSignature(bytes = "\u0006\u0005\tuqA\u0002\u000e\u001c\u0011\u0003\t3E\u0002\u0004&7!\u0005\u0011E\n\u0005\u0006[\u0005!\ta\f\u0005\ba\u0005\u0011\r\u0011\"\u00012\u0011\u0019Q\u0014\u0001)A\u0005e!91(\u0001b\u0001\n\u0013a\u0004BB$\u0002A\u0003%QHB\u0003&7\u0001\t\u0003\u000b\u0003\u0005X\u000f\t\u0005\t\u0015!\u0003Y\u0011!awA!A!\u0002\u0013i\u0007BCA\u0007\u000f\t\u0005\t\u0015!\u0003\u0002\u0010!Q\u0011qC\u0004\u0003\u0002\u0003\u0006I!!\u0007\t\u0015\u0005\u0005rA!A!\u0002\u0013\t\u0019\u0003\u0003\u0006\u0002,\u001d\u0011\t\u0011)A\u0005\u0003[A!\"!\u0013\b\u0005\u0003\u0005\u000b\u0011BA&\u0011\u0019is\u0001\"\u0001\u0002X!I\u0011\u0011O\u0004C\u0002\u0013%\u00111\u000f\u0005\t\u0003w:\u0001\u0015!\u0003\u0002v!I\u0011QP\u0004C\u0002\u0013%\u0011q\u0010\u0005\t\u0003\u0007;\u0001\u0015!\u0003\u0002\u0002\"9\u0011QQ\u0004\u0005\n\u0005\u001d\u0005bBAS\u000f\u0011%\u0011q\u0015\u0005\b\u0003W;A\u0011IAW\u0011\u001d\t9n\u0002C\u0005\u00033Dq!!<\b\t\u0003\ny\u000fC\u0004\u0003\u0006\u001d!\tEa\u0002\u00021\u00153XM\u001c;Qe>$WoY3s'\u0016\u0014h/[2f\u00136\u0004HN\u0003\u0002\u001d;\u0005A\u0011N\u001c;fe:\fGN\u0003\u0002\u001f?\u0005!qM\u001d9d\u0015\t\u0001\u0013%\u0001\u0006qe>TWm\u0019;j_:T\u0011AI\u0001\u0005C.\\\u0017\r\u0005\u0002%\u00035\t1D\u0001\rFm\u0016tG\u000f\u0015:pIV\u001cWM]*feZL7-Z%na2\u001c\"!A\u0014\u0011\u0005!ZS\"A\u0015\u000b\u0003)\nQa]2bY\u0006L!\u0001L\u0015\u0003\r\u0005s\u0017PU3g\u0003\u0019a\u0014N\\5u}\r\u0001A#A\u0012\u0002\u00071|w-F\u00013!\t\u0019\u0004(D\u00015\u0015\t)d'A\u0003tY\u001a$$NC\u00018\u0003\ry'oZ\u0005\u0003sQ\u0012a\u0001T8hO\u0016\u0014\u0018\u0001\u00027pO\u0002\n!BZ;ukJ,Gi\u001c8f+\u0005i\u0004c\u0001 B\u00076\tqH\u0003\u0002AS\u0005Q1m\u001c8dkJ\u0014XM\u001c;\n\u0005\t{$A\u0002$viV\u0014XM\u0004\u0002E\u000b6\t\u0011%\u0003\u0002GC\u0005!Ai\u001c8f\u0003-1W\u000f^;sK\u0012{g.\u001a\u0011)\u0005\u0005I\u0005C\u0001&N\u001b\u0005Y%B\u0001'\"\u0003)\tgN\\8uCRLwN\\\u0005\u0003\u001d.\u00131\"\u00138uKJt\u0017\r\\!qS\"\u0012\u0001!S\n\u0004\u000f\u001d\n\u0006C\u0001*V\u001b\u0005\u0019&B\u0001+\u001c\u0003\u0015\u0001(o\u001c;p\u0013\t16K\u0001\u000fFm\u0016tG\u000f\u0015:pIV\u001cWM]*feZL7-\u001a)po\u0016\u0014\u0018\t]5\u0002\rML8\u000f^3na\tI6\rE\u0002[?\u0006l\u0011a\u0017\u0006\u00039v\u000bQ\u0001^=qK\u0012T!AX\u0011\u0002\u000b\u0005\u001cGo\u001c:\n\u0005\u0001\\&aC!di>\u00148+_:uK6\u0004\"AY2\r\u0001\u0011IA\rCA\u0001\u0002\u0003\u0015\t!\u001a\u0002\u0004?\u0012\n\u0014C\u00014j!\tAs-\u0003\u0002iS\t9aj\u001c;iS:<\u0007C\u0001\u0015k\u0013\tY\u0017FA\u0002B]f\f\u0011$\u001a<f]R\u001c()_*mS\u000e,7\u000fU3s'R\u0014X-Y7JIB!a.\u001e=|\u001d\ty7\u000f\u0005\u0002qS5\t\u0011O\u0003\u0002s]\u00051AH]8pizJ!\u0001^\u0015\u0002\rA\u0013X\rZ3g\u0013\t1xOA\u0002NCBT!\u0001^\u0015\u0011\u00059L\u0018B\u0001>x\u0005\u0019\u0019FO]5oOB\u0019A0!\u0003\u000e\u0003uT!A`@\u0002\u0011M\u001c\u0017\r\\1eg2T1\u0001XA\u0001\u0015\u0011\t\u0019!!\u0002\u0002\u000bE,XM]=\u000b\u0007\u0005\u001d\u0011%A\u0006qKJ\u001c\u0018n\u001d;f]\u000e,\u0017bAA\u0006{\n\u0011RI^3oiN\u0014\u0015p\u00157jG\u0016\fV/\u001a:z\u00039*g/\u001a8ug\nK8\u000b\\5dKN\u001cF/\u0019:uS:<gI]8n':\f\u0007o\u001d5piN\u0004VM]*ue\u0016\fW.\u00133\u0011\u000b9,\b0!\u0005\u0011\u0007q\f\u0019\"C\u0002\u0002\u0016u\u0014q%\u0012<f]R\u001c()_*mS\u000e,7\u000b^1si&twM\u0012:p[Ns\u0017\r]:i_R\u001c\u0018+^3ss\u000693-\u001e:sK:$XI^3oiN\u0014\u0015\u0010U3sg&\u001cH/\u001a8dK&#\u0007+\u001a:TiJ,\u0017-\\%e!\u0015qW\u000f_A\u000e!\ra\u0018QD\u0005\u0004\u0003?i(AJ\"veJ,g\u000e^#wK:$8OQ=QKJ\u001c\u0018n\u001d;f]\u000e,\u0017\n\u001a+za\u0016$\u0017+^3ss\u0006Y4-\u001e:sK:$XI^3oiN\u0014\u0015\u0010U3sg&\u001cH/\u001a8dK&#7\u000b^1si&twM\u0012:p[Ns\u0017\r]:i_R\u0004VM]*ue\u0016\fW.\u00133\u0011\u000b9,\b0!\n\u0011\u0007q\f9#C\u0002\u0002*u\u0014QgQ;se\u0016tG/\u0012<f]R\u001c()\u001f)feNL7\u000f^3oG\u0016LEm\u0015;beRLgn\u001a$s_6\u001cf.\u00199tQ>$\u0018+^3ss\u000691o\\;sG\u0016\u001c\b#\u00028\u00020\u0005M\u0012bAA\u0019o\n\u00191+\u001a;\u0011\t\u0005U\u00121\t\b\u0005\u0003o\ty$\u0004\u0002\u0002:)\u0019a0a\u000f\u000b\u0007\u0005uR$\u0001\u0005qe>$WoY3s\u0013\u0011\t\t%!\u000f\u0002\u001b\u00153XM\u001c;Qe>$WoY3s\u0013\u0011\t)%a\u0012\u0003'\u00153XM\u001c;Qe>$WoY3s'>,(oY3\u000b\t\u0005\u0005\u0013\u0011H\u0001\fS:$XM]2faR|'\u000fE\u0003)\u0003\u001b\n\t&C\u0002\u0002P%\u0012aa\u00149uS>t\u0007\u0003BA\u001c\u0003'JA!!\u0016\u0002:\tARI^3oiB\u0013x\u000eZ;dKJLe\u000e^3sG\u0016\u0004Ho\u001c:\u0015!\u0005e\u00131LA3\u0003O\nI'a\u001b\u0002n\u0005=\u0004C\u0001\u0013\b\u0011\u00199v\u00021\u0001\u0002^A\"\u0011qLA2!\u0011Qv,!\u0019\u0011\u0007\t\f\u0019\u0007\u0002\u0006e\u00037\n\t\u0011!A\u0003\u0002\u0015DQ\u0001\\\bA\u00025Dq!!\u0004\u0010\u0001\u0004\ty\u0001C\u0004\u0002\u0018=\u0001\r!!\u0007\t\u000f\u0005\u0005r\u00021\u0001\u0002$!9\u00111F\bA\u0002\u00055\u0002bBA%\u001f\u0001\u0007\u00111J\u0001\u0016aJ|Go\\!osN+'/[1mSj\fG/[8o+\t\t)\bE\u0002%\u0003oJ1!!\u001f\u001c\u0005U\u0001&o\u001c;p\u0003:L8+\u001a:jC2L'0\u0019;j_:\fa\u0003\u001d:pi>\fe._*fe&\fG.\u001b>bi&|g\u000eI\u0001\u0014gR\u0014X-Y7JIR{7k\\;sG\u0016l\u0015\r]\u000b\u0003\u0003\u0003\u0003RA\\;y\u0003g\tAc\u001d;sK\u0006l\u0017\n\u001a+p'>,(oY3NCB\u0004\u0013!C5oi\u0016\u00148-\u001a9u)\u0019\tI)!%\u0002\u0016B!a(QAF!\r!\u0015QR\u0005\u0004\u0003\u001f\u000b#\u0001\u0002#p]\u0016Da!a%\u0015\u0001\u0004A\u0018\u0001C:ue\u0016\fW.\u00133\t\u000f\u0005]E\u00031\u0001\u0002\u001a\u0006AQ.\u001a;bI\u0006$\u0018\r\u0005\u0003\u0002\u001c\u0006\u0005VBAAO\u0015\rq\u0018q\u0014\u0006\u0003=\u0005JA!a)\u0002\u001e\nAQ*\u001a;bI\u0006$\u0018-\u0001\ffm\u0016tG\u000f\u0015:pIV\u001cWM]*pkJ\u001cWMR8s)\u0011\t\u0019$!+\t\r\u0005MU\u00031\u0001y\u00039)g/\u001a8ug\nK8\u000b\\5dKN$b!a,\u0002J\u0006U\u0007\u0003CAY\u0003s\u000bi,a1\u000e\u0005\u0005M&b\u0001@\u00026*\u0019\u0011qW\u0011\u0002\rM$(/Z1n\u0013\u0011\tY,a-\u0003\rM{WO]2f!\r\u0011\u0016qX\u0005\u0004\u0003\u0003\u001c&!C*ue\u0016\fWnT;u!\r!\u0015QY\u0005\u0004\u0003\u000f\f#a\u0002(piV\u001bX\r\u001a\u0005\b\u0003\u00174\u0002\u0019AAg\u0003\tIg\u000e\u0005\u0005\u00022\u0006e\u0016qZAb!\r\u0011\u0016\u0011[\u0005\u0004\u0003'\u001c&\u0001C*ue\u0016\fW.\u00138\t\u000f\u0005]e\u00031\u0001\u0002\u001a\u0006\t\"/\u001e8Fm\u0016tGo\u001d\"z'2L7-Z:\u0015\r\u0005m\u0017\u0011]Av!)\t\t,!8\u0002P\u0006u\u00161Y\u0005\u0005\u0003?\f\u0019L\u0001\u0003GY><\bbBAr/\u0001\u0007\u0011Q]\u0001\u0005S:LG\u000fE\u0002S\u0003OL1!!;T\u0005\u001dIe.\u001b;SKFDq!a&\u0018\u0001\u0004\tI*\u0001\bfm\u0016tG\u000fV5nKN$\u0018-\u001c9\u0015\r\u0005E\u0018\u0011 B\u0002!\u0011q\u0014)a=\u0011\u0007I\u000b)0C\u0002\u0002xN\u0013a#\u0012<f]R$\u0016.\\3ti\u0006l\u0007OU3ta>t7/\u001a\u0005\b\u0003wD\u0002\u0019AA\u007f\u0003\r\u0011X-\u001d\t\u0004%\u0006}\u0018b\u0001B\u0001'\n)RI^3oiRKW.Z:uC6\u0004(+Z9vKN$\bbBAL1\u0001\u0007\u0011\u0011T\u0001\nY>\fG-\u0012<f]R$bA!\u0003\u0003\u0012\te\u0001\u0003\u0002 B\u0005\u0017\u00012A\u0015B\u0007\u0013\r\u0011ya\u0015\u0002\u0012\u0019>\fG-\u0012<f]R\u0014Vm\u001d9p]N,\u0007bBA~3\u0001\u0007!1\u0003\t\u0004%\nU\u0011b\u0001B\f'\n\u0001Bj\\1e\u000bZ,g\u000e\u001e*fcV,7\u000f\u001e\u0005\b\u0003/K\u0002\u0019AAMQ\t9\u0011\n")
@InternalApi
/* loaded from: input_file:akka/projection/grpc/internal/EventProducerServiceImpl.class */
public class EventProducerServiceImpl implements EventProducerServicePowerApi {
    private final ActorSystem<?> system;
    private final Map<String, EventsBySliceQuery> eventsBySlicesPerStreamId;
    private final Map<String, EventsBySliceStartingFromSnapshotsQuery> eventsBySlicesStartingFromSnapshotsPerStreamId;
    private final Map<String, CurrentEventsByPersistenceIdTypedQuery> currentEventsByPersistenceIdPerStreamId;
    private final Map<String, CurrentEventsByPersistenceIdStartingFromSnapshotQuery> currentEventsByPersistenceIdStartingFromSnapshotPerStreamId;
    private final Option<EventProducerInterceptor> interceptor;
    private final ProtoAnySerialization protoAnySerialization;
    private final Map<String, EventProducer.EventProducerSource> streamIdToSourceMap;

    public static Logger log() {
        return EventProducerServiceImpl$.MODULE$.log();
    }

    private ProtoAnySerialization protoAnySerialization() {
        return this.protoAnySerialization;
    }

    private Map<String, EventProducer.EventProducerSource> streamIdToSourceMap() {
        return this.streamIdToSourceMap;
    }

    private Future<Done> intercept(String str, Metadata metadata) {
        Some some = this.interceptor;
        if (some instanceof Some) {
            return ((EventProducerInterceptor) some.value()).intercept(str, metadata);
        }
        if (None$.MODULE$.equals(some)) {
            return EventProducerServiceImpl$.MODULE$.akka$projection$grpc$internal$EventProducerServiceImpl$$futureDone();
        }
        throw new MatchError(some);
    }

    private EventProducer.EventProducerSource eventProducerSourceFor(String str) {
        return (EventProducer.EventProducerSource) streamIdToSourceMap().getOrElse(str, () -> {
            throw new GrpcServiceException(Status.NOT_FOUND.withDescription(new StringBuilder(45).append("Stream id [").append(str).append("] is not available for consumption").toString()));
        });
    }

    @Override // akka.projection.grpc.internal.proto.EventProducerServicePowerApi
    public Source<StreamOut, NotUsed> eventsBySlices(Source<StreamIn, NotUsed> source, Metadata metadata) {
        return source.prefixAndTail(1).flatMapConcat(tuple2 -> {
            Seq seq;
            StreamIn streamIn;
            Seq seq2;
            StreamIn streamIn2;
            if (tuple2 != null) {
                Seq seq3 = (Seq) tuple2._1();
                Source source2 = (Source) tuple2._2();
                if (seq3 != null) {
                    SeqOps unapplySeq = package$.MODULE$.Seq().unapplySeq(seq3);
                    if (!SeqFactory$UnapplySeqWrapper$.MODULE$.isEmpty$extension(unapplySeq) && new SeqFactory.UnapplySeqWrapper(SeqFactory$UnapplySeqWrapper$.MODULE$.get$extension(unapplySeq)) != null && SeqFactory$UnapplySeqWrapper$.MODULE$.lengthCompare$extension(SeqFactory$UnapplySeqWrapper$.MODULE$.get$extension(unapplySeq), 1) == 0 && (streamIn2 = (StreamIn) SeqFactory$UnapplySeqWrapper$.MODULE$.apply$extension(SeqFactory$UnapplySeqWrapper$.MODULE$.get$extension(unapplySeq), 0)) != null) {
                        StreamIn.Message message = streamIn2.message();
                        if (message instanceof StreamIn.Message.Init) {
                            return source2.via(this.runEventsBySlices(((StreamIn.Message.Init) message).m300value(), metadata));
                        }
                    }
                }
            }
            if (tuple2 != null && (seq2 = (Seq) tuple2._1()) != null) {
                SeqOps unapplySeq2 = package$.MODULE$.Seq().unapplySeq(seq2);
                if (!SeqFactory$UnapplySeqWrapper$.MODULE$.isEmpty$extension(unapplySeq2) && new SeqFactory.UnapplySeqWrapper(SeqFactory$UnapplySeqWrapper$.MODULE$.get$extension(unapplySeq2)) != null && SeqFactory$UnapplySeqWrapper$.MODULE$.lengthCompare$extension(SeqFactory$UnapplySeqWrapper$.MODULE$.get$extension(unapplySeq2), 0) == 0) {
                    EventProducerServiceImpl$.MODULE$.log().warn("Event stream closed before init.");
                    return Source$.MODULE$.empty();
                }
            }
            if (tuple2 != null && (seq = (Seq) tuple2._1()) != null) {
                SeqOps unapplySeq3 = package$.MODULE$.Seq().unapplySeq(seq);
                if (!SeqFactory$UnapplySeqWrapper$.MODULE$.isEmpty$extension(unapplySeq3) && new SeqFactory.UnapplySeqWrapper(SeqFactory$UnapplySeqWrapper$.MODULE$.get$extension(unapplySeq3)) != null && SeqFactory$UnapplySeqWrapper$.MODULE$.lengthCompare$extension(SeqFactory$UnapplySeqWrapper$.MODULE$.get$extension(unapplySeq3), 1) == 0 && (streamIn = (StreamIn) SeqFactory$UnapplySeqWrapper$.MODULE$.apply$extension(SeqFactory$UnapplySeqWrapper$.MODULE$.get$extension(unapplySeq3), 0)) != null) {
                    throw new IllegalArgumentException(new StringBuilder(64).append("Expected init message for eventsBySlices stream, ").append("but received [").append(streamIn.message().getClass().getName()).append("]").toString());
                }
            }
            if (tuple2 != null) {
                throw new IllegalStateException(new StringBuilder(39).append("Unexpected Seq prefix with [").append(((Seq) tuple2._1()).size()).append("] elements.").toString());
            }
            throw new MatchError(tuple2);
        });
    }

    private Flow<StreamIn, StreamOut, NotUsed> runEventsBySlices(InitReq initReq, Metadata metadata) {
        return Flow$.MODULE$.futureFlow(intercept(initReq.streamId(), metadata).map(done -> {
            Source failed;
            Function2 function2;
            EventProducer.EventProducerSource eventProducerSourceFor = this.eventProducerSourceFor(initReq.streamId());
            Instant protocolOffsetToOffset = ProtobufProtocolConversions$.MODULE$.protocolOffsetToOffset(initReq.offset());
            package$LoggerOps$ package_loggerops_ = package$LoggerOps$.MODULE$;
            Logger LoggerOps = akka.actor.typed.scaladsl.package$.MODULE$.LoggerOps(EventProducerServiceImpl$.MODULE$.log());
            ScalaRunTime$ scalaRunTime$ = ScalaRunTime$.MODULE$;
            Object[] objArr = new Object[5];
            objArr[0] = eventProducerSourceFor.streamId();
            objArr[1] = eventProducerSourceFor.entityType();
            objArr[2] = BoxesRunTime.boxToInteger(initReq.sliceMin());
            objArr[3] = BoxesRunTime.boxToInteger(initReq.sliceMax());
            objArr[4] = protocolOffsetToOffset instanceof TimestampOffset ? ((TimestampOffset) protocolOffsetToOffset).timestamp() : protocolOffsetToOffset;
            package_loggerops_.debugN$extension(LoggerOps, "Starting eventsBySlices stream [{}], [{}], slices [{} - {}], offset [{}]", scalaRunTime$.genericWrapArray(objArr));
            Some some = this.eventsBySlicesPerStreamId.get(initReq.streamId());
            if (some instanceof Some) {
                failed = ((EventsBySliceQuery) some.value()).eventsBySlices(eventProducerSourceFor.entityType(), initReq.sliceMin(), initReq.sliceMax(), protocolOffsetToOffset);
            } else {
                if (!None$.MODULE$.equals(some)) {
                    throw new MatchError(some);
                }
                Some some2 = this.eventsBySlicesStartingFromSnapshotsPerStreamId.get(initReq.streamId());
                if (some2 instanceof Some) {
                    failed = ((EventsBySliceStartingFromSnapshotsQuery) some2.value()).eventsBySlicesStartingFromSnapshots(eventProducerSourceFor.entityType(), initReq.sliceMin(), initReq.sliceMax(), protocolOffsetToOffset, (Function1) ((EventProducer.EventProducerSource) this.streamIdToSourceMap().apply(initReq.streamId())).transformSnapshot().get());
                } else {
                    if (!None$.MODULE$.equals(some2)) {
                        throw new MatchError(some2);
                    }
                    failed = Source$.MODULE$.failed(new IllegalArgumentException(new StringBuilder(50).append("No events by slices query defined for stream id [").append(initReq.streamId()).append("]").toString()));
                }
            }
            Source source = failed;
            Some some3 = this.currentEventsByPersistenceIdPerStreamId.get(initReq.streamId());
            if (some3 instanceof Some) {
                CurrentEventsByPersistenceIdTypedQuery currentEventsByPersistenceIdTypedQuery = (CurrentEventsByPersistenceIdTypedQuery) some3.value();
                function2 = (str, obj) -> {
                    return currentEventsByPersistenceIdTypedQuery.currentEventsByPersistenceIdTyped(str, BoxesRunTime.unboxToLong(obj), Long.MAX_VALUE);
                };
            } else {
                if (!None$.MODULE$.equals(some3)) {
                    throw new MatchError(some3);
                }
                Some some4 = this.currentEventsByPersistenceIdStartingFromSnapshotPerStreamId.get(initReq.streamId());
                if (some4 instanceof Some) {
                    CurrentEventsByPersistenceIdStartingFromSnapshotQuery currentEventsByPersistenceIdStartingFromSnapshotQuery = (CurrentEventsByPersistenceIdStartingFromSnapshotQuery) some4.value();
                    Function1 function1 = (Function1) ((EventProducer.EventProducerSource) this.streamIdToSourceMap().apply(initReq.streamId())).transformSnapshot().get();
                    function2 = (str2, obj2) -> {
                        return currentEventsByPersistenceIdStartingFromSnapshotQuery.currentEventsByPersistenceIdStartingFromSnapshot(str2, BoxesRunTime.unboxToLong(obj2), Long.MAX_VALUE, function1);
                    };
                } else {
                    if (!None$.MODULE$.equals(some4)) {
                        throw new MatchError(some4);
                    }
                    function2 = (str3, obj3) -> {
                        return $anonfun$runEventsBySlices$4(initReq, str3, BoxesRunTime.unboxToLong(obj3));
                    };
                }
            }
            return BidiFlow$.MODULE$.fromGraph(new FilterStage(initReq.streamId(), eventProducerSourceFor.entityType(), RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(initReq.sliceMin()), initReq.sliceMax()), initReq.filter(), function2, eventProducerSourceFor.producerFilter(), eventProducerSourceFor.settings().topicTagPrefix(), eventProducerSourceFor.settings().replayParallelism())).join(Flow$.MODULE$.fromSinkAndSource(Sink$.MODULE$.ignore(), source)).mapAsync(eventProducerSourceFor.settings().transformationParallelism(), eventEnvelope -> {
                return ProtobufProtocolConversions$.MODULE$.transformAndEncodeEvent(eventProducerSourceFor.transformation(), eventEnvelope, this.protoAnySerialization(), this.system.executionContext()).map(option -> {
                    if (option instanceof Some) {
                        Event event = (Event) ((Some) option).value();
                        package$LoggerOps$.MODULE$.traceN$extension(akka.actor.typed.scaladsl.package$.MODULE$.LoggerOps(EventProducerServiceImpl$.MODULE$.log()), "Emitting event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{eventEnvelope.persistenceId(), BoxesRunTime.boxToLong(eventEnvelope.sequenceNr()), eventEnvelope.offset(), event.source()}));
                        return new StreamOut(new StreamOut.Message.Event(event), StreamOut$.MODULE$.apply$default$2());
                    }
                    if (!None$.MODULE$.equals(option)) {
                        throw new MatchError(option);
                    }
                    package$LoggerOps$.MODULE$.traceN$extension(akka.actor.typed.scaladsl.package$.MODULE$.LoggerOps(EventProducerServiceImpl$.MODULE$.log()), "Filtered event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{eventEnvelope.persistenceId(), BoxesRunTime.boxToLong(eventEnvelope.sequenceNr()), eventEnvelope.offset(), eventEnvelope.source()}));
                    return new StreamOut(new StreamOut.Message.FilteredEvent(new FilteredEvent(eventEnvelope.persistenceId(), eventEnvelope.sequenceNr(), eventEnvelope.slice(), ProtobufProtocolConversions$.MODULE$.offsetToProtoOffset(eventEnvelope.offset()), FilteredEvent$.MODULE$.apply$default$5(), FilteredEvent$.MODULE$.apply$default$6())), StreamOut$.MODULE$.apply$default$2());
                }, this.system.executionContext());
            });
        }, this.system.executionContext())).mapMaterializedValue(future -> {
            return NotUsed$.MODULE$;
        });
    }

    @Override // akka.projection.grpc.internal.proto.EventProducerServicePowerApi
    public Future<EventTimestampResponse> eventTimestamp(EventTimestampRequest eventTimestampRequest, Metadata metadata) {
        return intercept(eventTimestampRequest.streamId(), metadata).flatMap(done -> {
            EventProducer.EventProducerSource eventProducerSource = (EventProducer.EventProducerSource) this.streamIdToSourceMap().apply(eventTimestampRequest.streamId());
            String extractEntityType = PersistenceId$.MODULE$.extractEntityType(eventTimestampRequest.persistenceId());
            String entityType = eventProducerSource.entityType();
            if (extractEntityType != null ? !extractEntityType.equals(entityType) : entityType != null) {
                throw new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription(new StringBuilder(145).append("Persistence id is for a type of entity that is not available for consumption (expected type ").append(" in persistence id for stream id [").append(eventTimestampRequest.streamId()).append("] is [").append(eventProducerSource.entityType()).append("] but was [").append(extractEntityType).append("])").toString()));
            }
            EventTimestampQuery eventTimestampQuery = (EventsBySliceQuery) this.eventsBySlicesPerStreamId.apply(eventTimestampRequest.streamId());
            return eventTimestampQuery instanceof EventTimestampQuery ? eventTimestampQuery.timestampOf(eventTimestampRequest.persistenceId(), eventTimestampRequest.seqNr()).map(option -> {
                if (option instanceof Some) {
                    return new EventTimestampResponse(new Some(Timestamp$.MODULE$.apply((Instant) ((Some) option).value())), EventTimestampResponse$.MODULE$.apply$default$2());
                }
                if (None$.MODULE$.equals(option)) {
                    return EventTimestampResponse$.MODULE$.m145defaultInstance();
                }
                throw new MatchError(option);
            }, this.system.executionContext()) : Future$.MODULE$.failed(new UnsupportedOperationException(new StringBuilder(34).append("eventTimestamp not supported by [").append(eventTimestampQuery.getClass().getName()).append("]").toString()));
        }, this.system.executionContext());
    }

    @Override // akka.projection.grpc.internal.proto.EventProducerServicePowerApi
    public Future<LoadEventResponse> loadEvent(LoadEventRequest loadEventRequest, Metadata metadata) {
        return intercept(loadEventRequest.streamId(), metadata).flatMap(done -> {
            EventProducer.EventProducerSource eventProducerSourceFor = this.eventProducerSourceFor(loadEventRequest.streamId());
            String extractEntityType = PersistenceId$.MODULE$.extractEntityType(loadEventRequest.persistenceId());
            String entityType = eventProducerSourceFor.entityType();
            if (extractEntityType != null ? !extractEntityType.equals(entityType) : entityType != null) {
                throw new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription(new StringBuilder(145).append("Persistence id is for a type of entity that is not available for consumption (expected type ").append(" in persistence id for stream id [").append(loadEventRequest.streamId()).append("] is [").append(eventProducerSourceFor.entityType()).append("] but was [").append(extractEntityType).append("])").toString()));
            }
            LoadEventQuery loadEventQuery = (EventsBySliceQuery) this.eventsBySlicesPerStreamId.apply(loadEventRequest.streamId());
            return loadEventQuery instanceof LoadEventQuery ? loadEventQuery.loadEnvelope(loadEventRequest.persistenceId(), loadEventRequest.seqNr()).flatMap(eventEnvelope -> {
                return ProtobufProtocolConversions$.MODULE$.transformAndEncodeEvent(eventProducerSourceFor.transformation(), eventEnvelope, this.protoAnySerialization(), this.system.executionContext()).map(option -> {
                    if (option instanceof Some) {
                        Event event = (Event) ((Some) option).value();
                        package$LoggerOps$.MODULE$.traceN$extension(akka.actor.typed.scaladsl.package$.MODULE$.LoggerOps(EventProducerServiceImpl$.MODULE$.log()), "Loaded event from persistenceId [{}] with seqNr [{}], offset [{}]", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{eventEnvelope.persistenceId(), BoxesRunTime.boxToLong(eventEnvelope.sequenceNr()), eventEnvelope.offset()}));
                        return new LoadEventResponse(new LoadEventResponse.Message.Event(event), LoadEventResponse$.MODULE$.apply$default$2());
                    }
                    if (!None$.MODULE$.equals(option)) {
                        throw new MatchError(option);
                    }
                    package$LoggerOps$.MODULE$.traceN$extension(akka.actor.typed.scaladsl.package$.MODULE$.LoggerOps(EventProducerServiceImpl$.MODULE$.log()), "Filtered loaded event from persistenceId [{}] with seqNr [{}], offset [{}]", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{eventEnvelope.persistenceId(), BoxesRunTime.boxToLong(eventEnvelope.sequenceNr()), eventEnvelope.offset()}));
                    return new LoadEventResponse(new LoadEventResponse.Message.FilteredEvent(new FilteredEvent(eventEnvelope.persistenceId(), eventEnvelope.sequenceNr(), eventEnvelope.slice(), ProtobufProtocolConversions$.MODULE$.offsetToProtoOffset(eventEnvelope.offset()), eventEnvelope.source(), FilteredEvent$.MODULE$.apply$default$6())), LoadEventResponse$.MODULE$.apply$default$2());
                }, this.system.executionContext());
            }, this.system.executionContext()).recoverWith(new EventProducerServiceImpl$$anonfun$$nestedInanonfun$loadEvent$1$1(null), this.system.executionContext()) : Future$.MODULE$.failed(new UnsupportedOperationException(new StringBuilder(29).append("loadEvent not supported by [").append(loadEventQuery.getClass().getName()).append("]").toString()));
        }, this.system.executionContext());
    }

    public static final /* synthetic */ void $anonfun$new$2(EventProducerServiceImpl eventProducerServiceImpl, EventProducer.EventProducerSource eventProducerSource) {
        Predef$.MODULE$.require(StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(eventProducerSource.streamId())), () -> {
            return new StringBuilder(64).append("EventProducerSource for [").append(eventProducerSource.entityType()).append("] contains empty stream id, not allowed").toString();
        });
        Predef$.MODULE$.require(eventProducerServiceImpl.eventsBySlicesPerStreamId.contains(eventProducerSource.streamId()) || eventProducerServiceImpl.eventsBySlicesStartingFromSnapshotsPerStreamId.contains(eventProducerSource.streamId()), () -> {
            return new StringBuilder(50).append("No events by slices query defined for stream id [").append(eventProducerSource.streamId()).append("]").toString();
        });
    }

    public static final /* synthetic */ Source $anonfun$runEventsBySlices$4(InitReq initReq, String str, long j) {
        return Source$.MODULE$.failed(new IllegalArgumentException(new StringBuilder(62).append("No currentEventsByPersistenceId query defined for stream id [").append(initReq.streamId()).append("]").toString()));
    }

    public EventProducerServiceImpl(ActorSystem<?> actorSystem, Map<String, EventsBySliceQuery> map, Map<String, EventsBySliceStartingFromSnapshotsQuery> map2, Map<String, CurrentEventsByPersistenceIdTypedQuery> map3, Map<String, CurrentEventsByPersistenceIdStartingFromSnapshotQuery> map4, Set<EventProducer.EventProducerSource> set, Option<EventProducerInterceptor> option) {
        this.system = actorSystem;
        this.eventsBySlicesPerStreamId = map;
        this.eventsBySlicesStartingFromSnapshotsPerStreamId = map2;
        this.currentEventsByPersistenceIdPerStreamId = map3;
        this.currentEventsByPersistenceIdStartingFromSnapshotPerStreamId = map4;
        this.interceptor = option;
        Predef$.MODULE$.require(set.nonEmpty(), () -> {
            return "Empty set of EventProducerSource passed to EventProducerService, must contain at least one";
        });
        set.foreach(eventProducerSource -> {
            $anonfun$new$2(this, eventProducerSource);
            return BoxedUnit.UNIT;
        });
        this.protoAnySerialization = new ProtoAnySerialization(actorSystem);
        this.streamIdToSourceMap = ((IterableOnceOps) set.map(eventProducerSource2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(eventProducerSource2.streamId()), eventProducerSource2);
        })).toMap($less$colon$less$.MODULE$.refl());
        EventProducerServiceImpl$.MODULE$.log().info("Event producer gRPC service created with available sources [{}]", ((IterableOnceOps) set.map(eventProducerSource3 -> {
            return new StringBuilder(32).append("(stream id: [").append(eventProducerSource3.streamId()).append("], entity type: [").append(eventProducerSource3.entityType()).append("])").toString();
        })).mkString(", "));
    }
}
