package akka.projection.grpc.internal;

import akka.Done;
import akka.NotUsed;
import akka.NotUsed$;
import akka.actor.typed.ActorSystem;
import akka.annotation.InternalApi;
import akka.grpc.scaladsl.Metadata;
import akka.persistence.query.typed.EventEnvelope;
import akka.projection.ProjectionContext;
import akka.projection.grpc.internal.FilterStage;
import akka.projection.grpc.internal.proto.ConsumeEventIn;
import akka.projection.grpc.internal.proto.ConsumeEventIn$;
import akka.projection.grpc.internal.proto.Event;
import akka.projection.grpc.internal.proto.EventConsumerServiceClient;
import akka.projection.grpc.internal.proto.FilteredEvent;
import akka.projection.grpc.internal.proto.FilteredEvent$;
import akka.projection.grpc.internal.proto.KeepAlive$;
import akka.projection.grpc.producer.scaladsl.EventProducer;
import akka.stream.Graph;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.FlowWithContext;
import akka.stream.scaladsl.FlowWithContext$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.Duration$;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxesRunTime;

/* compiled from: EventPusher.scala */
@InternalApi
/* loaded from: input_file:akka/projection/grpc/internal/EventPusher$.class */
public final class EventPusher$ {
    public static EventPusher$ MODULE$;
    private final Logger logger;
    private final Tuple2<ConsumeEventIn, ProjectionContext> KeepAliveTuple;

    static {
        new EventPusher$();
    }

    private Logger logger() {
        return this.logger;
    }

    public <Event> FlowWithContext<EventEnvelope<Event>, ProjectionContext, Done, ProjectionContext, NotUsed> apply(String str, EventConsumerServiceClient eventConsumerServiceClient, EventProducer.EventProducerSource eventProducerSource, Metadata metadata, ActorSystem<?> actorSystem) {
        ExecutionContextExecutor executionContext = actorSystem.executionContext();
        ProtoAnySerialization protoAnySerialization = new ProtoAnySerialization(actorSystem);
        return FlowWithContext$.MODULE$.fromTuples(Flow$.MODULE$.fromMaterializer((materializer, attributes) -> {
            Promise apply = Promise$.MODULE$.apply();
            Flow via = Flow$.MODULE$.apply().via(filterAndTransformFlow$1(apply.future(), eventProducerSource, protoAnySerialization, executionContext));
            FiniteDuration keepAliveInterval = eventProducerSource.settings().keepAliveInterval();
            FiniteDuration Zero = Duration$.MODULE$.Zero();
            return via.via((keepAliveInterval != null ? keepAliveInterval.equals(Zero) : Zero == null) ? Flow$.MODULE$.apply() : (Graph) Flow$.MODULE$.apply().keepAlive(eventProducerSource.settings().keepAliveInterval(), () -> {
                return MODULE$.KeepAliveTuple();
            })).via(Flow$.MODULE$.fromGraph(new EventPusherStage(str, eventProducerSource, eventConsumerServiceClient, metadata, apply)));
        }).mapMaterializedValue(future -> {
            return NotUsed$.MODULE$;
        }));
    }

    public Tuple2<ConsumeEventIn, ProjectionContext> KeepAliveTuple() {
        return this.KeepAliveTuple;
    }

    private static final Flow filterAndTransformFlow$1(Future future, EventProducer.EventProducerSource eventProducerSource, ProtoAnySerialization protoAnySerialization, ExecutionContext executionContext) {
        return Flow$.MODULE$.futureFlow(future.map(seq -> {
            FilterStage.Filter updateFilterFromProto = ProtobufProtocolConversions$.MODULE$.updateFilterFromProto(FilterStage$Filter$.MODULE$.empty(eventProducerSource.settings().topicTagPrefix()), seq, seq -> {
                return (Seq) Predef$.MODULE$.identity(seq);
            });
            return Flow$.MODULE$.apply().mapAsync(eventProducerSource.settings().transformationParallelism(), tuple2 -> {
                Future<Option<Event>> successful;
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                EventEnvelope<?> eventEnvelope = (EventEnvelope) tuple2._1();
                ProjectionContext projectionContext = (ProjectionContext) tuple2._2();
                if (BoxesRunTime.unboxToBoolean(eventProducerSource.producerFilter().apply(eventEnvelope)) && updateFilterFromProto.matches(eventEnvelope)) {
                    if (MODULE$.logger().isTraceEnabled()) {
                        MODULE$.logger().trace("Pushing event persistence id [{}], sequence number [{}]", eventEnvelope.persistenceId(), BoxesRunTime.boxToLong(eventEnvelope.sequenceNr()));
                    }
                    successful = ProtobufProtocolConversions$.MODULE$.transformAndEncodeEvent(eventProducerSource.transformation(), eventEnvelope, protoAnySerialization, executionContext);
                } else {
                    if (MODULE$.logger().isTraceEnabled()) {
                        MODULE$.logger().trace("Filtering event persistence id [{}], sequence number [{}]", eventEnvelope.persistenceId(), BoxesRunTime.boxToLong(eventEnvelope.sequenceNr()));
                    }
                    successful = Future$.MODULE$.successful(None$.MODULE$);
                }
                return successful.map(option -> {
                    if (option instanceof Some) {
                        return new Tuple2(new ConsumeEventIn(new ConsumeEventIn.Message.Event((Event) ((Some) option).value()), ConsumeEventIn$.MODULE$.apply$default$2()), projectionContext);
                    }
                    if (None$.MODULE$.equals(option)) {
                        return new Tuple2(new ConsumeEventIn(new ConsumeEventIn.Message.FilteredEvent(new FilteredEvent(eventEnvelope.persistenceId(), eventEnvelope.sequenceNr(), eventEnvelope.slice(), ProtobufProtocolConversions$.MODULE$.offsetToProtoOffset(eventEnvelope.offset()), FilteredEvent$.MODULE$.apply$default$5(), FilteredEvent$.MODULE$.apply$default$6())), ConsumeEventIn$.MODULE$.apply$default$2()), projectionContext);
                    }
                    throw new MatchError(option);
                }, executionContext);
            });
        }, executionContext)).mapMaterializedValue(future2 -> {
            return NotUsed$.MODULE$;
        });
    }

    private EventPusher$() {
        MODULE$ = this;
        this.logger = LoggerFactory.getLogger(getClass());
        this.KeepAliveTuple = new Tuple2<>(new ConsumeEventIn(new ConsumeEventIn.Message.KeepAlive(KeepAlive$.MODULE$.m224defaultInstance()), ConsumeEventIn$.MODULE$.apply$default$2()), (Object) null);
    }
}
