package akka.projection.grpc.replication.internal;

import akka.Done;
import akka.Done$;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.scaladsl.package$;
import akka.actor.typed.scaladsl.package$LoggerOps$;
import akka.annotation.InternalApi;
import akka.cluster.ClusterActorRefProvider;
import akka.cluster.sharding.typed.ReplicatedEntity;
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings;
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings$;
import akka.cluster.sharding.typed.scaladsl.ClusterSharding;
import akka.cluster.sharding.typed.scaladsl.ClusterSharding$;
import akka.cluster.sharding.typed.scaladsl.EntityRef;
import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess$;
import akka.persistence.Persistence$;
import akka.persistence.query.typed.EventEnvelope;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.ReplicationId;
import akka.persistence.typed.ReplicationId$;
import akka.persistence.typed.internal.PublishedEventImpl;
import akka.persistence.typed.internal.ReplicatedEventMetadata;
import akka.persistence.typed.internal.ReplicatedPublishedEventMetaData;
import akka.projection.ProjectionBehavior;
import akka.projection.ProjectionBehavior$;
import akka.projection.ProjectionContext;
import akka.projection.ProjectionId;
import akka.projection.ProjectionId$;
import akka.projection.eventsourced.scaladsl.EventSourcedProvider$;
import akka.projection.grpc.consumer.GrpcQuerySettings;
import akka.projection.grpc.consumer.GrpcQuerySettings$;
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal;
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal$;
import akka.projection.grpc.producer.scaladsl.EventProducer;
import akka.projection.grpc.producer.scaladsl.EventProducer$;
import akka.projection.grpc.producer.scaladsl.EventProducer$Transformation$;
import akka.projection.grpc.replication.scaladsl.Replica;
import akka.projection.grpc.replication.scaladsl.ReplicationSettings;
import akka.stream.scaladsl.FlowWithContext;
import akka.stream.scaladsl.FlowWithContext$;
import akka.util.Timeout;
import akka.util.Timeout$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.collection.StringOps$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.Set;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.math.Ordering$Int$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: ReplicationImpl.scala */
@InternalApi
/* loaded from: input_file:akka/projection/grpc/replication/internal/ReplicationImpl$.class */
public final class ReplicationImpl$ {
    public static final ReplicationImpl$ MODULE$ = new ReplicationImpl$();
    private static final Logger log = LoggerFactory.getLogger(ReplicationImpl.class);
    private static final Future<None$> filteredEvent = Future$.MODULE$.successful(None$.MODULE$);

    private Logger log() {
        return log;
    }

    private Future<None$> filteredEvent() {
        return filteredEvent;
    }

    public <Command, Event, State> ReplicationImpl<Command> grpcReplication(ReplicationSettings<Command> replicationSettings, ReplicatedEntity<Command> replicatedEntity, ActorSystem<?> actorSystem) {
        Predef$.MODULE$.require(actorSystem.classicSystem().provider() instanceof ClusterActorRefProvider, () -> {
            return "Replicated Event Sourcing over gRPC only possible together with Akka cluster (akka.actor.provider = cluster)";
        });
        EventProducer.EventProducerSource eventProducerSource = new EventProducer.EventProducerSource(replicationSettings.entityTypeKey().name(), replicationSettings.streamId(), EventProducer$Transformation$.MODULE$.empty().registerAsyncEnvelopeOrElseMapper(eventEnvelope -> {
            Some eventMetadata = eventEnvelope.eventMetadata();
            if (eventMetadata instanceof Some) {
                Object value = eventMetadata.value();
                if (value instanceof ReplicatedEventMetadata) {
                    ReplicaId originReplica = ((ReplicatedEventMetadata) value).originReplica();
                    ReplicaId selfReplicaId = replicationSettings.selfReplicaId();
                    return (originReplica != null ? !originReplica.equals(selfReplicaId) : selfReplicaId != null) ? MODULE$.filteredEvent() : Future$.MODULE$.successful(eventEnvelope.eventOption());
                }
            }
            throw new IllegalArgumentException(new StringBuilder(74).append("Got an event without replication metadata, not supported (pid: ").append(eventEnvelope.persistenceId()).append(", seq_nr: ").append(eventEnvelope.sequenceNr()).append(")").toString());
        }), replicationSettings.eventProducerSettings());
        ClusterSharding apply = ClusterSharding$.MODULE$.apply(actorSystem);
        apply.init(replicatedEntity.entity());
        Function1 function1 = str -> {
            return apply.entityRefFor(replicatedEntity.entity().typeKey(), str);
        };
        replicationSettings.otherReplicas().foreach(replica -> {
            $anonfun$grpcReplication$4(replicationSettings, function1, actorSystem, replica);
            return BoxedUnit.UNIT;
        });
        return new ReplicationImpl<>(eventProducerSource, () -> {
            return EventProducer$.MODULE$.grpcServiceHandler((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new EventProducer.EventProducerSource[]{eventProducerSource})), replicationSettings.eventProducerInterceptor(), actorSystem);
        }, replicatedEntity.entity().typeKey(), function1);
    }

    private <C> void startConsumer(Replica replica, ReplicationSettings<C> replicationSettings, Function1<String, EntityRef<C>> function1, ActorSystem<?> actorSystem) {
        ShardedDaemonProcessSettings withRole;
        Timeout durationToTimeout = Timeout$.MODULE$.durationToTimeout(replicationSettings.entityEventReplicationTimeout());
        ExecutionContextExecutor executionContext = actorSystem.executionContext();
        String sb = new StringBuilder(6).append("RES_").append(replicationSettings.entityTypeKey().name()).append("_").append(replicationSettings.selfReplicaId().id()).append("_").append(replica.replicaId().id()).toString();
        Predef$.MODULE$.require(StringOps$.MODULE$.size$extension(Predef$.MODULE$.augmentString(sb)) < 255, () -> {
            return new StringBuilder(176).append("The generated projection name for replica [").append(replica.replicaId().id()).append("]: '").append(sb).append("' is too long to fit ").append("in the database column, must be at most 255 characters. See if you can shorten replica or entity type names.").toString();
        });
        IndexedSeq sliceRanges = Persistence$.MODULE$.apply(actorSystem).sliceRanges(replica.numberOfConsumers());
        GrpcQuerySettings apply = GrpcQuerySettings$.MODULE$.apply(replicationSettings.streamId());
        GrpcReadJournal apply2 = GrpcReadJournal$.MODULE$.apply((GrpcQuerySettings) replica.additionalQueryRequestMetadata().fold(() -> {
            return apply;
        }, metadata -> {
            return apply.withAdditionalRequestMetadata(metadata);
        }), replica.grpcClientSettings(), Nil$.MODULE$, actorSystem);
        package$LoggerOps$.MODULE$.infoN$extension(package$.MODULE$.LoggerOps(log()), "Starting {} projection streams{} consuming events for Replicated Entity [{}] from [{}] (at {}:{})", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(replica.numberOfConsumers()), replica.consumersOnClusterRole().fold(() -> {
            return "";
        }, str -> {
            return new StringBuilder(28).append(" on nodes with cluster role ").append(str).toString();
        }), replicationSettings.entityTypeKey().name(), replica.replicaId().id(), replica.grpcClientSettings().serviceName(), BoxesRunTime.boxToInteger(replica.grpcClientSettings().defaultPort())}));
        ShardedDaemonProcessSettings apply3 = ShardedDaemonProcessSettings$.MODULE$.apply(actorSystem);
        Some consumersOnClusterRole = replica.consumersOnClusterRole();
        if (None$.MODULE$.equals(consumersOnClusterRole)) {
            withRole = apply3;
        } else {
            if (!(consumersOnClusterRole instanceof Some)) {
                throw new MatchError(consumersOnClusterRole);
            }
            withRole = apply3.withRole((String) consumersOnClusterRole.value());
        }
        ShardedDaemonProcess$.MODULE$.apply(actorSystem).init(sb, replica.numberOfConsumers(), obj -> {
            return $anonfun$startConsumer$6(sliceRanges, sb, replicationSettings, replica, function1, durationToTimeout, executionContext, actorSystem, apply2, BoxesRunTime.unboxToInt(obj));
        }, withRole, None$.MODULE$, ClassTag$.MODULE$.apply(ProjectionBehavior.Command.class));
    }

    public static final /* synthetic */ void $anonfun$grpcReplication$4(ReplicationSettings replicationSettings, Function1 function1, ActorSystem actorSystem, Replica replica) {
        MODULE$.startConsumer(replica, replicationSettings, function1, actorSystem);
    }

    public static final /* synthetic */ void $anonfun$startConsumer$9(String str, String str2, Replica replica, EventEnvelope eventEnvelope, Throwable th) {
        MODULE$.log().warn(new StringBuilder(63).append("Failing replication stream [").append(str).append("/").append(str2).append("] from [").append(replica.replicaId().id()).append("], event pid [").append(eventEnvelope.persistenceId()).append("], seq_nr [").append(eventEnvelope.sequenceNr()).append("]").toString(), th);
    }

    public static final /* synthetic */ Behavior $anonfun$startConsumer$6(IndexedSeq indexedSeq, String str, ReplicationSettings replicationSettings, Replica replica, Function1 function1, Timeout timeout, ExecutionContext executionContext, ActorSystem actorSystem, GrpcReadJournal grpcReadJournal, int i) {
        Range range = (Range) indexedSeq.apply(i);
        String sb = new StringBuilder(1).append(range.min(Ordering$Int$.MODULE$)).append("-").append(range.max(Ordering$Int$.MODULE$)).toString();
        ProjectionId apply = ProjectionId$.MODULE$.apply(str, sb);
        FlowWithContext<EventEnvelope<Object>, ProjectionContext, Done, ProjectionContext, NotUsed> flowWithContext = (FlowWithContext) FlowWithContext$.MODULE$.apply().via(new ParallelUpdatesFlow(replicationSettings.parallelUpdates(), eventEnvelope -> {
            if (eventEnvelope.filtered()) {
                if (MODULE$.log().isTraceEnabled()) {
                    package$LoggerOps$.MODULE$.traceN$extension(package$.MODULE$.LoggerOps(MODULE$.log()), "[{}] ignoring filtered event from replica [{}] (pid [{}], seq_nr [{}])", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{sb, replica.replicaId(), eventEnvelope.persistenceId(), BoxesRunTime.boxToLong(eventEnvelope.sequenceNr())}));
                }
                return Future$.MODULE$.successful(Done$.MODULE$);
            }
            Some eventMetadata = eventEnvelope.eventMetadata();
            if (eventMetadata instanceof Some) {
                Object value = eventMetadata.value();
                if (value instanceof ReplicatedEventMetadata) {
                    ReplicatedEventMetadata replicatedEventMetadata = (ReplicatedEventMetadata) value;
                    Predef$ predef$ = Predef$.MODULE$;
                    ReplicaId originReplica = replicatedEventMetadata.originReplica();
                    ReplicaId replicaId = replica.replicaId();
                    predef$.require(originReplica != null ? originReplica.equals(replicaId) : replicaId == null);
                    ReplicationId fromString = ReplicationId$.MODULE$.fromString(eventEnvelope.persistenceId());
                    ReplicationId withReplica = fromString.withReplica(replicationSettings.selfReplicaId());
                    EntityRef entityRef = (EntityRef) function1.apply(withReplica.entityId());
                    if (MODULE$.log().isTraceEnabled()) {
                        package$LoggerOps$.MODULE$.traceN$extension(package$.MODULE$.LoggerOps(MODULE$.log()), "[{}] forwarding event originating on dc [{}] to [{}] (origin seq_nr [{}]): [{}]", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{sb, replicatedEventMetadata.originReplica(), withReplica.persistenceId().id(), BoxesRunTime.boxToLong(eventEnvelope.sequenceNr()), replicatedEventMetadata.version()}));
                    }
                    Future ask = entityRef.ask(actorRef -> {
                        return new PublishedEventImpl(fromString.persistenceId(), replicatedEventMetadata.originSequenceNr(), eventEnvelope.event(), eventEnvelope.timestamp(), new Some(new ReplicatedPublishedEventMetaData(replicatedEventMetadata.originReplica(), replicatedEventMetadata.version())), new Some(actorRef));
                    }, timeout);
                    ask.failed().foreach(th -> {
                        $anonfun$startConsumer$9(str, sb, replica, eventEnvelope, th);
                        return BoxedUnit.UNIT;
                    }, executionContext);
                    return ask;
                }
            }
            throw new IllegalArgumentException(new StringBuilder(131).append("Got unexpected type of event envelope metadata: ").append(eventMetadata.getClass()).append(" (pid [").append(eventEnvelope.persistenceId()).append("], seq_nr [").append(eventEnvelope.sequenceNr()).append("]").append(", is the remote entity really a Replicated Event Sourced Entity?").toString());
        })).map(eventEnvelope2 -> {
            return Done$.MODULE$;
        });
        return ProjectionBehavior$.MODULE$.apply(replicationSettings.projectionProvider().apply(apply, EventSourcedProvider$.MODULE$.eventsBySlices(actorSystem, grpcReadJournal, grpcReadJournal.streamId(), range.min(Ordering$Int$.MODULE$), range.max(Ordering$Int$.MODULE$)), flowWithContext, actorSystem));
    }

    private ReplicationImpl$() {
    }
}
