package cloud.orbit.actors.client.streams;

import cloud.orbit.actors.cluster.NodeAddress;
import cloud.orbit.actors.peer.streams.ClientSideStreamProxy;
import cloud.orbit.actors.peer.streams.ServerSideStreamProxy;
import cloud.orbit.actors.runtime.BasicRuntime;
import cloud.orbit.actors.runtime.DefaultClassDictionary;
import cloud.orbit.actors.runtime.InternalUtils;
import cloud.orbit.actors.streams.AsyncObserver;
import cloud.orbit.actors.streams.AsyncStream;
import cloud.orbit.actors.streams.StreamSequenceToken;
import cloud.orbit.actors.streams.StreamSubscriptionHandle;
import cloud.orbit.concurrent.Task;
import cloud.orbit.lifecycle.Startable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/* loaded from: input_file:cloud/orbit/actors/client/streams/ClientSideStreamProxyImpl.class */
public class ClientSideStreamProxyImpl implements ClientSideStreamProxy, Startable {
    private ConcurrentMap<StreamKey, ConcurrentMap<Handle, AsyncObserver>> observerMap = new ConcurrentHashMap();
    static AtomicLong nextId = new AtomicLong();
    private ClientSideStreamProxy localReference;
    private BasicRuntime runtime;

    @Override // cloud.orbit.actors.peer.streams.ClientSideStreamProxy
    public Task<Void> onNext(String str, String str2, Object obj) {
        ConcurrentMap<Handle, AsyncObserver> concurrentMap = this.observerMap.get(new StreamKey(str, obj.getClass(), str2));
        if (concurrentMap != null) {
            concurrentMap.values().forEach(asyncObserver -> {
                asyncObserver.onNext(obj, (StreamSequenceToken) null);
            });
        }
        return Task.done();
    }

    public <T> Task<Void> unsubscribe(StreamSubscriptionHandle<T> streamSubscriptionHandle) {
        ConcurrentMap<Handle, AsyncObserver> concurrentMap = this.observerMap.get(((Handle) streamSubscriptionHandle).key);
        if (concurrentMap == null || concurrentMap.remove(streamSubscriptionHandle) == null) {
            throw new IllegalStateException("not subscribed: " + streamSubscriptionHandle);
        }
        return Task.done();
    }

    public void setRuntime(BasicRuntime basicRuntime) {
        this.runtime = basicRuntime;
    }

    public Task<?> start() {
        this.localReference = this.runtime.registerObserver(ClientSideStreamProxy.class, (String) null, this);
        return Task.done();
    }

    public <T> AsyncStream<T> getStream(final String str, final Class<T> cls, final String str2) {
        return new AsyncStream<T>() { // from class: cloud.orbit.actors.client.streams.ClientSideStreamProxyImpl.1
            public Task<Void> unsubscribe(StreamSubscriptionHandle<T> streamSubscriptionHandle) {
                return ClientSideStreamProxyImpl.this.unsubscribe(streamSubscriptionHandle);
            }

            public Task<StreamSubscriptionHandle<T>> subscribe(AsyncObserver<T> asyncObserver, StreamSequenceToken streamSequenceToken) {
                return ClientSideStreamProxyImpl.this.subscribe(str, cls, str2, asyncObserver);
            }

            public Task<Void> publish(T t) {
                return null;
            }
        };
    }

    private static /* synthetic */ Task lambda$checkCast$cloud_orbit_concurrent_Task(CompletionStage completionStage) {
        if (completionStage instanceof Task) {
            return (Task) completionStage;
        }
        Task task = new Task();
        completionStage.whenComplete((obj, th) -> {
            if (th != null) {
                task.completeExceptionally(th);
            } else {
                task.complete(obj);
            }
        });
        return task;
    }

    public <T> Task<StreamSubscriptionHandle<T>> subscribe(String str, Class<T> cls, String str2, AsyncObserver<T> asyncObserver) {
        StreamKey streamKey = new StreamKey(str, cls, str2);
        ConcurrentMap<Handle, AsyncObserver> concurrentMap = this.observerMap.get(streamKey);
        if (concurrentMap == null) {
            concurrentMap = (ConcurrentMap) InternalUtils.putIfAbsentAndGet(this.observerMap, streamKey, new ConcurrentHashMap());
            ServerSideStreamProxy serverSideStreamProxy = (ServerSideStreamProxy) this.runtime.getRemoteObserverReference((NodeAddress) null, ServerSideStreamProxy.class, "0");
            int classId = DefaultClassDictionary.get().getClassId(cls);
            Task<StreamSubscriptionHandle<T>> subscribe = serverSideStreamProxy.subscribe(str, classId, str2, this.localReference);
            if (!subscribe.toCompletableFuture().isDone()) {
                int i = 1;
                return lambda$checkCast$cloud_orbit_concurrent_Task(subscribe.exceptionally(Function.identity()).thenCompose((v12) -> {
                    return async$subscribe(r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, v12);
                }).toCompletableFuture());
            }
            subscribe.toCompletableFuture().join();
        }
        ConcurrentMap<Handle, AsyncObserver> concurrentMap2 = concurrentMap != null ? concurrentMap : (ConcurrentMap) InternalUtils.putIfAbsentAndGet(this.observerMap, streamKey, new ConcurrentHashMap());
        Handle handle = new Handle(streamKey, nextId.incrementAndGet());
        concurrentMap2.put(handle, asyncObserver);
        return Task.fromValue(handle);
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:2:0x0002. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:13:0x00c8  */
    /* JADX WARN: Removed duplicated region for block: B:16:0x00cd  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private static java.util.concurrent.CompletableFuture async$subscribe(cloud.orbit.actors.client.streams.ClientSideStreamProxyImpl r22, java.lang.String r23, java.lang.Class r24, java.lang.String r25, cloud.orbit.actors.streams.AsyncObserver r26, cloud.orbit.actors.client.streams.StreamKey r27, java.util.concurrent.ConcurrentMap r28, cloud.orbit.actors.peer.streams.ServerSideStreamProxy r29, int r30, cloud.orbit.concurrent.Task r31, cloud.orbit.concurrent.Task r32, int r33, java.lang.Object r34) {
        /*
            Method dump skipped, instructions count: 274
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: cloud.orbit.actors.client.streams.ClientSideStreamProxyImpl.async$subscribe(cloud.orbit.actors.client.streams.ClientSideStreamProxyImpl, java.lang.String, java.lang.Class, java.lang.String, cloud.orbit.actors.streams.AsyncObserver, cloud.orbit.actors.client.streams.StreamKey, java.util.concurrent.ConcurrentMap, cloud.orbit.actors.peer.streams.ServerSideStreamProxy, int, cloud.orbit.concurrent.Task, cloud.orbit.concurrent.Task, int, java.lang.Object):java.util.concurrent.CompletableFuture");
    }
}
