package org.apache.pekko.stream.connectors.sse.scaladsl;

import java.io.Serializable;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.ClassicActorSystemProvider;
import org.apache.pekko.http.scaladsl.client.RequestBuilding$;
import org.apache.pekko.http.scaladsl.coding.Coders$;
import org.apache.pekko.http.scaladsl.model.HttpRequest;
import org.apache.pekko.http.scaladsl.model.HttpResponse;
import org.apache.pekko.http.scaladsl.model.MediaRange;
import org.apache.pekko.http.scaladsl.model.MediaRange$;
import org.apache.pekko.http.scaladsl.model.MediaTypes$;
import org.apache.pekko.http.scaladsl.model.Uri;
import org.apache.pekko.http.scaladsl.model.headers.Accept$;
import org.apache.pekko.http.scaladsl.model.headers.Last$minusEvent$minusID$;
import org.apache.pekko.http.scaladsl.model.sse.ServerSentEvent;
import org.apache.pekko.http.scaladsl.model.sse.ServerSentEvent$;
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal$;
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshaller$;
import org.apache.pekko.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling$;
import org.apache.pekko.stream.FlowShape;
import org.apache.pekko.stream.Materializer$;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.SourceShape$;
import org.apache.pekko.stream.UniformFanInShape;
import org.apache.pekko.stream.UniformFanOutShape;
import org.apache.pekko.stream.scaladsl.Broadcast$;
import org.apache.pekko.stream.scaladsl.Flow;
import org.apache.pekko.stream.scaladsl.Flow$;
import org.apache.pekko.stream.scaladsl.FlowOps;
import org.apache.pekko.stream.scaladsl.GraphDSL;
import org.apache.pekko.stream.scaladsl.GraphDSL$;
import org.apache.pekko.stream.scaladsl.GraphDSL$Implicits$;
import org.apache.pekko.stream.scaladsl.Merge$;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration$;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.ModuleSerializationProxy;
import scala.runtime.ScalaRunTime$;

/* compiled from: EventSource.scala */
/* loaded from: input_file:org/apache/pekko/stream/connectors/sse/scaladsl/EventSource$.class */
public final class EventSource$ implements Serializable {
    public static final EventSource$ MODULE$ = new EventSource$();
    public static final Source<ServerSentEvent, NotUsed> org$apache$pekko$stream$connectors$sse$scaladsl$EventSource$$$noEvents = Source$.MODULE$.empty();
    private static final Source<ServerSentEvent, NotUsed> singleDelimiter = Source$.MODULE$.single(ServerSentEvent$.MODULE$.heartbeat());

    private EventSource$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(EventSource$.class);
    }

    public Source<ServerSentEvent, NotUsed> apply(Uri uri, Function1<HttpRequest, Future<HttpResponse>> function1, Option<String> option, FiniteDuration finiteDuration, ClassicActorSystemProvider classicActorSystemProvider) {
        ActorSystem classicSystem = classicActorSystemProvider.classicSystem();
        FlowOps mapAsync = Flow$.MODULE$.apply().mapAsync(1, option2 -> {
            return getEventSource$1(uri, function1, classicSystem, option2);
        });
        Function1 function12 = source -> {
            return recover$1(source);
        };
        Flow flatMapConcat = mapAsync.flatMapConcat(function12.andThen(source2 -> {
            return delimit$1(source2);
        }));
        Flow prepend = Flow$.MODULE$.apply().prepend(Source$.MODULE$.single(ServerSentEvent$.MODULE$.heartbeat()));
        Flow drop = prepend.sliding(2, prepend.sliding$default$2()).collect(new EventSource$$anon$2()).scan(option, (option3, serverSentEvent) -> {
            return serverSentEvent.id().orElse(() -> {
                return r1.$anonfun$5$$anonfun$1(r2);
            });
        }).drop(1L);
        return Source$.MODULE$.fromGraph(GraphDSL$.MODULE$.create(builder -> {
            SourceShape add = builder.add(Source$.MODULE$.single(option));
            UniformFanInShape add2 = builder.add(Merge$.MODULE$.apply(2, Merge$.MODULE$.apply$default$2()));
            UniformFanOutShape add3 = builder.add(Broadcast$.MODULE$.apply(2, true));
            FlowShape add4 = builder.add(Flow$.MODULE$.apply().filter(serverSentEvent2 -> {
                ServerSentEvent heartbeat = ServerSentEvent$.MODULE$.heartbeat();
                return serverSentEvent2 != null ? !serverSentEvent2.equals(heartbeat) : heartbeat != null;
            }));
            Flow apply = Flow$.MODULE$.apply();
            FlowShape add5 = builder.add(apply.delay(finiteDuration, apply.delay$default$2()));
            new GraphDSL.Implicits.SourceShapeArrow(GraphDSL$Implicits$.MODULE$.SourceShapeArrow(add)).$tilde$greater(add2, builder).$tilde$greater(flatMapConcat, builder).$tilde$greater(add3, builder).$tilde$greater(add4, builder);
            new GraphDSL.Implicits.FanInOps(GraphDSL$Implicits$.MODULE$.FanInOps(add2)).$less$tilde(add5, builder).$less$tilde(drop, builder).$less$tilde(add3, builder);
            return SourceShape$.MODULE$.apply(add4.out());
        }));
    }

    public Option<String> apply$default$3() {
        return None$.MODULE$;
    }

    public FiniteDuration apply$default$4() {
        return Duration$.MODULE$.Zero();
    }

    private final Future getEventSource$1(Uri uri, Function1 function1, ActorSystem actorSystem, Option option) {
        return ((Future) function1.apply((HttpRequest) Option$.MODULE$.option2Iterable(option).foldLeft(RequestBuilding$.MODULE$.Get().apply(uri).addHeader(Accept$.MODULE$.apply(MediaRange$.MODULE$.apply(MediaTypes$.MODULE$.text$divevent$minusstream()), ScalaRunTime$.MODULE$.wrapRefArray(new MediaRange[0]))), (httpRequest, str) -> {
            return httpRequest.addHeader(Last$minusEvent$minusID$.MODULE$.apply(str));
        }))).map(httpResponse -> {
            return Coders$.MODULE$.Gzip().decodeMessage(httpResponse);
        }, actorSystem.dispatcher()).flatMap(httpResponse2 -> {
            return Unmarshal$.MODULE$.apply(httpResponse2).to(Unmarshaller$.MODULE$.messageUnmarshallerFromEntityUnmarshaller(EventStreamUnmarshalling$.MODULE$.fromEventsStream(actorSystem)), actorSystem.dispatcher(), Materializer$.MODULE$.matFromSystem(actorSystem));
        }, actorSystem.dispatcher()).fallbackTo(Future$.MODULE$.successful(org$apache$pekko$stream$connectors$sse$scaladsl$EventSource$$$noEvents));
    }

    private final Source recover$1(Source source) {
        return source.recoverWithRetries(1, new EventSource$$anon$1());
    }

    private final Source delimit$1(Source source) {
        return source.concat(singleDelimiter);
    }

    private final Option $anonfun$5$$anonfun$1(Option option) {
        return option;
    }
}
