package cloud.orbit.actors.server.streams;

import cloud.orbit.actors.Stage;
import cloud.orbit.actors.peer.streams.ClientSideStreamProxy;
import cloud.orbit.actors.peer.streams.ServerSideStreamProxy;
import cloud.orbit.actors.runtime.DefaultClassDictionary;
import cloud.orbit.actors.server.ServerPeer;
import cloud.orbit.actors.streams.AsyncObserver;
import cloud.orbit.actors.streams.AsyncStream;
import cloud.orbit.actors.streams.StreamSequenceToken;
import cloud.orbit.actors.streams.StreamSubscriptionHandle;
import cloud.orbit.actors.streams.simple.StreamReference;
import cloud.orbit.actors.transactions.IdUtils;
import cloud.orbit.concurrent.Task;
import cloud.orbit.lifecycle.Startable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

/* loaded from: input_file:cloud/orbit/actors/server/streams/ServerSideStreamProxyImpl.class */
public class ServerSideStreamProxyImpl implements ServerSideStreamProxy, Startable {
    private Stage stage;
    private ConcurrentMap<StreamSubscriptionHandle, SubscriptionInfo> handleMap = new ConcurrentHashMap();
    private ServerPeer peer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: cloud.orbit.actors.server.streams.ServerSideStreamProxyImpl$1, reason: invalid class name */
    /* loaded from: input_file:cloud/orbit/actors/server/streams/ServerSideStreamProxyImpl$1.class */
    public class AnonymousClass1 implements AsyncObserver {
        final /* synthetic */ ClientSideStreamProxy val$proxy;
        final /* synthetic */ String val$provider;
        final /* synthetic */ String val$streamId;

        AnonymousClass1(ClientSideStreamProxy clientSideStreamProxy, String str, String str2) {
            this.val$proxy = clientSideStreamProxy;
            this.val$provider = str;
            this.val$streamId = str2;
        }

        public Task<Void> onNext(Object obj, StreamSequenceToken streamSequenceToken) {
            return this.val$proxy.onNext(this.val$provider, this.val$streamId, obj);
        }
    }

    @Override // cloud.orbit.actors.peer.streams.ServerSideStreamProxy
    public <T> Task<Void> unsubscribe(StreamSubscriptionHandle<T> streamSubscriptionHandle) {
        SubscriptionInfo remove = this.handleMap.remove(streamSubscriptionHandle);
        if (remove == null) {
            throw new IllegalArgumentException("Not subscribed " + streamSubscriptionHandle);
        }
        return remove.stream.unsubscribe(remove.actualHandle);
    }

    public Task<?> start() {
        this.peer.registerObserver(ServerSideStreamProxy.class, "0", this);
        return Task.done();
    }

    public Task<?> stop() {
        return Task.allOf(this.handleMap.values().stream().map(subscriptionInfo -> {
            return subscriptionInfo.stream.unsubscribe(subscriptionInfo.actualHandle);
        }));
    }

    public void setStage(Stage stage) {
        this.stage = stage;
    }

    public void setPeer(ServerPeer serverPeer) {
        this.peer = serverPeer;
    }

    private static /* synthetic */ Task lambda$checkCast$cloud_orbit_concurrent_Task(CompletionStage completionStage) {
        if (completionStage instanceof Task) {
            return (Task) completionStage;
        }
        Task task = new Task();
        completionStage.whenComplete((obj, th) -> {
            if (th != null) {
                task.completeExceptionally(th);
            } else {
                task.complete(obj);
            }
        });
        return task;
    }

    @Override // cloud.orbit.actors.peer.streams.ServerSideStreamProxy
    public <T> Task<StreamSubscriptionHandle<T>> subscribe(String str, int i, String str2, ClientSideStreamProxy clientSideStreamProxy) {
        Class<T> classById = DefaultClassDictionary.get().getClassById(i);
        AsyncStream<T> stream = this.stage.getStream(str, classById, str2);
        Task subscribe = stream.subscribe(new AnonymousClass1(clientSideStreamProxy, str, str2), (StreamSequenceToken) null);
        if (!subscribe.toCompletableFuture().isDone()) {
            int i2 = 1;
            return lambda$checkCast$cloud_orbit_concurrent_Task(subscribe.exceptionally(Function.identity()).thenCompose((v10) -> {
                return async$subscribe(r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, v10);
            }).toCompletableFuture());
        }
        subscribe.toCompletableFuture().join();
        StreamReference.SimpleStreamHandle simpleStreamHandle = new StreamReference.SimpleStreamHandle(String.valueOf(IdUtils.sequentialLongId()));
        this.handleMap.putIfAbsent(simpleStreamHandle, new SubscriptionInfo(stream, (StreamSubscriptionHandle) subscribe.join(), clientSideStreamProxy));
        return Task.fromValue(simpleStreamHandle);
    }

    private static CompletableFuture async$subscribe(ServerSideStreamProxyImpl serverSideStreamProxyImpl, String str, int i, String str2, ClientSideStreamProxy clientSideStreamProxy, Class cls, AsyncStream asyncStream, Task task, Task task2, int i2, Object obj) {
        Task task3;
        switch (i2) {
            case 0:
                Class classById = DefaultClassDictionary.get().getClassById(i);
                asyncStream = serverSideStreamProxyImpl.stage.getStream(str, classById, str2);
                task = asyncStream.subscribe(new AnonymousClass1(clientSideStreamProxy, str, str2), (StreamSequenceToken) null);
                task3 = task;
                if (!task3.toCompletableFuture().isDone()) {
                    int i3 = 1;
                    return task3.exceptionally(Function.identity()).thenCompose((v10) -> {
                        return async$subscribe(r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, v10);
                    }).toCompletableFuture();
                }
                break;
            case 1:
                task3 = task2;
                break;
            default:
                throw new IllegalArgumentException();
        }
        task3.toCompletableFuture().join();
        StreamReference.SimpleStreamHandle simpleStreamHandle = new StreamReference.SimpleStreamHandle(String.valueOf(IdUtils.sequentialLongId()));
        serverSideStreamProxyImpl.handleMap.putIfAbsent(simpleStreamHandle, new SubscriptionInfo(asyncStream, (StreamSubscriptionHandle) task.join(), clientSideStreamProxy));
        return Task.fromValue(simpleStreamHandle);
    }
}
